diff --git a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/FileNamingConventions.java b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/FileNamingConventions.java
index d7b594d64..131e2d57a 100644
--- a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/FileNamingConventions.java
+++ b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/FileNamingConventions.java
@@ -63,4 +63,8 @@ interface FileNamingConventions {
*/
PathMatcher ENCRYPTED_FILE_GLOB_MATCHER = FileSystems.getDefault().getPathMatcher("glob:**/*{" + BASIC_FILE_EXT + "," + LONG_NAME_FILE_EXT + "}");
+ /**
+ * On OSX, folders with this extension are treated as a package.
+ */
+ String FOLDER_EXTENSION = ".cryptomator";
}
diff --git a/main/pom.xml b/main/pom.xml
index 5bd9ee4f9..7a2228307 100644
--- a/main/pom.xml
+++ b/main/pom.xml
@@ -32,8 +32,10 @@
4.0
3.3.2
1.10
- 2.4.4
-
+ 2.4.4
+ 1.10.19
+ 2.2.3
+
@@ -117,6 +119,19 @@
${junit.version}
test
+
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
+
+
+ com.github.axet
+ desktop
+ ${axetDesktop.version}
+
@@ -137,6 +152,10 @@
junit
junit
+
+ org.mockito
+ mockito-core
+
diff --git a/main/ui/pom.xml b/main/ui/pom.xml
index e94484d42..a6dd4c297 100644
--- a/main/ui/pom.xml
+++ b/main/ui/pom.xml
@@ -19,7 +19,7 @@
Cryptomator
- org.cryptomator.ui.MainApplication
+ org.cryptomator.ui.Main
${java.home}/../lib/ant-javafx.jar
8.20.8
@@ -52,9 +52,14 @@
- org.controlsfx
- controlsfx
- ${controlsfx.version}
+ org.controlsfx
+ controlsfx
+ ${controlsfx.version}
+
+
+
+ com.github.axet
+ desktop
diff --git a/main/ui/src/main/java/org/cryptomator/ui/Main.java b/main/ui/src/main/java/org/cryptomator/ui/Main.java
new file mode 100644
index 000000000..104bb8834
--- /dev/null
+++ b/main/ui/src/main/java/org/cryptomator/ui/Main.java
@@ -0,0 +1,76 @@
+/*******************************************************************************
+ * Copyright (c) 2014 Sebastian Stenzel
+ * This file is licensed under the terms of the MIT license.
+ * See the LICENSE.txt file for more info.
+ *
+ * Contributors:
+ * Sebastian Stenzel - initial API and implementation
+ ******************************************************************************/
+package org.cryptomator.ui;
+
+import java.io.File;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import javafx.application.Application;
+
+import org.controlsfx.tools.Platform;
+import org.cryptomator.ui.util.SingleInstanceManager;
+import org.cryptomator.ui.util.SingleInstanceManager.RemoteInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.axet.desktop.os.mac.AppleHandlers;
+
+public class Main {
+ public static final Logger LOG = LoggerFactory.getLogger(MainApplication.class);
+
+ public static final CompletableFuture> openFileHandler = new CompletableFuture<>();
+
+ public static void main(String[] args) {
+ if (Platform.getCurrent().equals(org.controlsfx.tools.Platform.OSX)) {
+ /*
+ * On OSX we're in an awkward position. We need to register a
+ * handler in the main thread of this application. However, we can't
+ * even pass objects to the application, so we're forced to use a
+ * static CompletableFuture for the handler, which actually opens
+ * the file in the application.
+ */
+ try {
+ AppleHandlers.getAppleHandlers().addOpenFileListener(list -> {
+ try {
+ openFileHandler.get().accept(list);
+ } catch (Exception e) {
+ LOG.error("exception handling file open event", e);
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (Throwable e) {
+ // Since we're trying to call OS-specific code, we'll just have
+ // to hope for the best.
+ LOG.error("exception adding OSX file open handler", e);
+ }
+ }
+
+ /*
+ * Before starting the application, we check if there is already an
+ * instance running on this computer. If so, we send our command line
+ * arguments to that instance and quit.
+ */
+ final Optional remoteInstance = SingleInstanceManager.getRemoteInstance(MainApplication.APPLICATION_KEY);
+
+ if (remoteInstance.isPresent()) {
+ try (RemoteInstance instance = remoteInstance.get()) {
+ LOG.info("An instance of Cryptomator is already running at {}.", instance.getRemotePort());
+ for (int i = 0; i < args.length; i++) {
+ remoteInstance.get().sendMessage(args[i], 1000);
+ }
+ } catch (Exception e) {
+ LOG.error("Error forwarding arguments to remote instance", e);
+ }
+ } else {
+ Application.launch(MainApplication.class, args);
+ }
+ }
+}
diff --git a/main/ui/src/main/java/org/cryptomator/ui/MainApplication.java b/main/ui/src/main/java/org/cryptomator/ui/MainApplication.java
index de5f564fb..8d004ac63 100644
--- a/main/ui/src/main/java/org/cryptomator/ui/MainApplication.java
+++ b/main/ui/src/main/java/org/cryptomator/ui/MainApplication.java
@@ -8,9 +8,12 @@
******************************************************************************/
package org.cryptomator.ui;
+import java.io.File;
import java.io.IOException;
import java.util.ResourceBundle;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import javafx.application.Application;
import javafx.application.Platform;
@@ -20,24 +23,37 @@ import javafx.scene.Scene;
import javafx.stage.Stage;
import org.apache.commons.lang3.SystemUtils;
+import org.cryptomator.crypto.aes256.Aes256Cryptor;
import org.cryptomator.ui.settings.Settings;
import org.cryptomator.ui.util.ActiveWindowStyleSupport;
+import org.cryptomator.ui.util.SingleInstanceManager;
+import org.cryptomator.ui.util.SingleInstanceManager.LocalInstance;
import org.cryptomator.ui.util.TrayIconUtil;
import org.cryptomator.webdav.WebDavServer;
import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MainApplication extends Application {
private static final Set SHUTDOWN_TASKS = new ConcurrentHashSet<>();
private static final CleanShutdownPerformer CLEAN_SHUTDOWN_PERFORMER = new CleanShutdownPerformer();
- public static void main(String[] args) {
- Application.launch(args);
- Runtime.getRuntime().addShutdownHook(CLEAN_SHUTDOWN_PERFORMER);
- }
+ public static final String APPLICATION_KEY = "CryptomatorGUI";
+
+ private static final Logger LOG = LoggerFactory.getLogger(MainApplication.class);
+
+ ExecutorService executorService;
@Override
public void start(final Stage primaryStage) throws IOException {
+ Runtime.getRuntime().addShutdownHook(MainApplication.CLEAN_SHUTDOWN_PERFORMER);
+
+ executorService = Executors.newCachedThreadPool();
+ addShutdownTask(() -> {
+ executorService.shutdown();
+ });
+
WebDavServer.getInstance().start();
chooseNativeStylesheet();
final ResourceBundle rb = ResourceBundle.getBundle("localization");
@@ -55,6 +71,43 @@ public class MainApplication extends Application {
TrayIconUtil.init(primaryStage, rb, () -> {
quit();
});
+
+ for (String arg : getParameters().getUnnamed()) {
+ handleCommandLineArg(ctrl, arg);
+ }
+
+ if (org.controlsfx.tools.Platform.getCurrent().equals(org.controlsfx.tools.Platform.OSX)) {
+ Main.openFileHandler.complete(file -> handleCommandLineArg(ctrl, file.getAbsolutePath()));
+ }
+
+ LocalInstance cryptomatorGuiInstance = SingleInstanceManager.startLocalInstance(APPLICATION_KEY, executorService);
+ addShutdownTask(() -> {
+ cryptomatorGuiInstance.close();
+ });
+
+ cryptomatorGuiInstance.registerListener(arg -> handleCommandLineArg(ctrl, arg));
+ }
+
+ void handleCommandLineArg(final MainController ctrl, String arg) {
+ File file = new File(arg);
+ if (!file.exists()) {
+ if (!file.mkdirs()) {
+ return;
+ }
+ // directory created.
+ } else if (file.isFile()) {
+ if (file.getName().toLowerCase().endsWith(Aes256Cryptor.MASTERKEY_FILE_EXT.toLowerCase())) {
+ file = file.getParentFile();
+ } else {
+ // is a file, but not a masterkey file
+ return;
+ }
+ }
+ File f = file;
+ Platform.runLater(() -> {
+ ctrl.addDirectory(f);
+ ctrl.toFront();
+ });
}
private void chooseNativeStylesheet() {
@@ -95,7 +148,11 @@ public class MainApplication extends Application {
@Override
public void run() {
SHUTDOWN_TASKS.forEach(r -> {
- r.run();
+ try {
+ r.run();
+ } catch (Throwable e) {
+ LOG.error("exception while shutting down", e);
+ }
});
SHUTDOWN_TASKS.clear();
}
diff --git a/main/ui/src/main/java/org/cryptomator/ui/MainController.java b/main/ui/src/main/java/org/cryptomator/ui/MainController.java
index bf5bc3be9..3a9cf2719 100644
--- a/main/ui/src/main/java/org/cryptomator/ui/MainController.java
+++ b/main/ui/src/main/java/org/cryptomator/ui/MainController.java
@@ -75,6 +75,17 @@ public class MainController implements Initializable, InitializationListener, Un
private void didClickAddDirectory(ActionEvent event) {
final DirectoryChooser dirChooser = new DirectoryChooser();
final File file = dirChooser.showDialog(stage);
+ addDirectory(file);
+ }
+
+ /**
+ * adds the given directory or selects it if it is already in the list of
+ * directories.
+ *
+ * @param file
+ * non-null, writable, existing directory
+ */
+ void addDirectory(final File file) {
if (file != null && file.canWrite()) {
final Directory dir = new Directory(file.toPath());
if (!directoryList.getItems().contains(dir)) {
@@ -199,4 +210,13 @@ public class MainController implements Initializable, InitializationListener, Un
this.stage = stage;
}
+ /**
+ * Attempts to make the application window visible.
+ */
+ public void toFront() {
+ stage.setIconified(false);
+ stage.show();
+ stage.toFront();
+ }
+
}
diff --git a/main/ui/src/main/java/org/cryptomator/ui/model/Directory.java b/main/ui/src/main/java/org/cryptomator/ui/model/Directory.java
index 93c5f7c56..c2b80b0ef 100644
--- a/main/ui/src/main/java/org/cryptomator/ui/model/Directory.java
+++ b/main/ui/src/main/java/org/cryptomator/ui/model/Directory.java
@@ -116,7 +116,11 @@ public class Directory implements Serializable {
* @return Directory name without preceeding path components
*/
public String getName() {
- return path.getFileName().toString();
+ String name = path.getFileName().toString();
+ if (name.toLowerCase().endsWith(Aes256Cryptor.FOLDER_EXTENSION.toLowerCase())) {
+ name = name.substring(0, name.length() - Aes256Cryptor.FOLDER_EXTENSION.length());
+ }
+ return name;
}
public Cryptor getCryptor() {
diff --git a/main/ui/src/main/java/org/cryptomator/ui/util/ListenerRegistry.java b/main/ui/src/main/java/org/cryptomator/ui/util/ListenerRegistry.java
new file mode 100644
index 000000000..7b26509f1
--- /dev/null
+++ b/main/ui/src/main/java/org/cryptomator/ui/util/ListenerRegistry.java
@@ -0,0 +1,80 @@
+/*******************************************************************************
+ * Copyright (c) 2014 Sebastian Stenzel
+ * This file is licensed under the terms of the MIT license.
+ * See the LICENSE.txt file for more info.
+ *
+ * Contributors:
+ * Sebastian Stenzel - initial API and implementation
+ ******************************************************************************/
+package org.cryptomator.ui.util;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+/**
+ * Manages and broadcasts events to a set of listeners. The types of the
+ * listener and event are entirely unbound. Instead, a method must be supplied
+ * to broadcast an event to a single listener.
+ *
+ * @author Tillmann Gaida
+ *
+ * @param
+ * The type of listener.
+ * @param
+ * The type of event.
+ */
+public class ListenerRegistry {
+ final BiConsumer listenerCaller;
+
+ /**
+ * Constructs a new registry.
+ *
+ * @param listenerCaller
+ * The method which broadcasts an event to a single listener.
+ */
+ public ListenerRegistry(BiConsumer listenerCaller) {
+ super();
+ this.listenerCaller = listenerCaller;
+ }
+
+ /**
+ * The handle of a registered listener.
+ */
+ public interface ListenerRegistration {
+ void unregister();
+ }
+
+ final AtomicLong serial = new AtomicLong();
+ /*
+ * Since this is a {@link ConcurrentSkipListMap}, we can at the same time
+ * add to, remove from, and iterate over it. More importantly, a Listener
+ * can remove itself while being called from the {@link #broadcast(Object)}
+ * method.
+ */
+ final Map listeners = new ConcurrentSkipListMap<>();
+
+ public ListenerRegistration registerListener(LISTENER listener) {
+ final long s = serial.incrementAndGet();
+
+ listeners.put(s, listener);
+
+ return () -> {
+ listeners.remove(s);
+ };
+ }
+
+ /**
+ * Broadcasts the given event to all registered listeners. If a listener
+ * causes an unchecked exception, that exception is thrown immediately
+ * without calling the other listeners.
+ *
+ * @param event
+ */
+ public void broadcast(EVENT event) {
+ for (LISTENER listener : listeners.values()) {
+ listenerCaller.accept(listener, event);
+ }
+ }
+}
diff --git a/main/ui/src/main/java/org/cryptomator/ui/util/SingleInstanceManager.java b/main/ui/src/main/java/org/cryptomator/ui/util/SingleInstanceManager.java
new file mode 100644
index 000000000..455305880
--- /dev/null
+++ b/main/ui/src/main/java/org/cryptomator/ui/util/SingleInstanceManager.java
@@ -0,0 +1,367 @@
+/*******************************************************************************
+ * Copyright (c) 2014 Sebastian Stenzel
+ * This file is licensed under the terms of the MIT license.
+ * See the LICENSE.txt file for more info.
+ *
+ * Contributors:
+ * Sebastian Stenzel - initial API and implementation
+ ******************************************************************************/
+package org.cryptomator.ui.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.prefs.Preferences;
+
+import org.cryptomator.ui.Main;
+import org.cryptomator.ui.util.ListenerRegistry.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Classes and methods to manage running this application in a mode, which only
+ * shows one instance.
+ *
+ * @author Tillmann Gaida
+ */
+public class SingleInstanceManager {
+ private static final Logger LOG = LoggerFactory.getLogger(SingleInstanceManager.class);
+
+ /**
+ * Connection to a running instance
+ */
+ public static class RemoteInstance implements Closeable {
+ final SocketChannel channel;
+
+ RemoteInstance(SocketChannel channel) {
+ super();
+ this.channel = channel;
+ }
+
+ /**
+ * Sends a message to the running instance.
+ *
+ * @param string
+ * May not be longer than 2^16 - 1 bytes.
+ * @param timeout
+ * timeout in milliseconds. this should be larger than the
+ * precision of {@link System#currentTimeMillis()}.
+ * @return true if the message was sent within the given timeout.
+ * @throws IOException
+ */
+ public boolean sendMessage(String string, long timeout) throws IOException {
+ Objects.requireNonNull(string);
+ byte[] message = string.getBytes();
+ if (message.length >= 256 * 256) {
+ throw new IOException("Message too long.");
+ }
+
+ ByteBuffer buf = ByteBuffer.allocate(message.length + 2);
+ buf.put((byte) (message.length / 256));
+ buf.put((byte) (message.length % 256));
+ buf.put(message);
+
+ buf.flip();
+ TimeoutTask.attempt(t -> {
+ if (channel.write(buf) < 0) {
+ return true;
+ }
+ return !buf.hasRemaining();
+ }, timeout, 10);
+ return !buf.hasRemaining();
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+
+ public int getRemotePort() throws IOException {
+ return ((InetSocketAddress) channel.getRemoteAddress()).getPort();
+ }
+ }
+
+ public static interface MessageListener {
+ void handleMessage(String message);
+ }
+
+ /**
+ * Represents a socket making this the main instance of the application.
+ */
+ public static abstract class LocalInstance implements Closeable {
+ private class ChannelState {
+ ByteBuffer write = ByteBuffer.wrap(applicationKey.getBytes());
+ ByteBuffer readLength = ByteBuffer.allocate(2);
+ ByteBuffer readMessage = null;
+ }
+
+ final ListenerRegistry registry = new ListenerRegistry<>(MessageListener::handleMessage);
+ final String applicationKey;
+
+ public LocalInstance(String applicationKey) {
+ Objects.requireNonNull(applicationKey);
+ this.applicationKey = applicationKey;
+ }
+
+ /**
+ * Register a listener for
+ *
+ * @param listener
+ * @return
+ */
+ public ListenerRegistration registerListener(MessageListener listener) {
+ Objects.requireNonNull(listener);
+ return registry.registerListener(listener);
+ }
+
+ void handle(SelectionKey key) throws IOException {
+ if (key.attachment() == null) {
+ key.attach(new ChannelState());
+ }
+
+ ChannelState state = (ChannelState) key.attachment();
+
+ if (key.isWritable() && state.write != null) {
+ ((WritableByteChannel) key.channel()).write(state.write);
+ if (!state.write.hasRemaining()) {
+ state.write = null;
+ }
+ LOG.debug("wrote welcome. switching to read only.");
+ key.interestOps(SelectionKey.OP_READ);
+ }
+
+ if (key.isReadable()) {
+ ByteBuffer buffer = state.readLength != null ? state.readLength : state.readMessage;
+
+ if (((ReadableByteChannel) key.channel()).read(buffer) < 0) {
+ key.cancel();
+ }
+
+ if (!buffer.hasRemaining()) {
+ buffer.flip();
+ if (state.readLength != null) {
+ int length = (buffer.get() + 256) % 256;
+ length = length * 256 + ((buffer.get() + 256) % 256);
+
+ state.readLength = null;
+ state.readMessage = ByteBuffer.allocate(length);
+ } else {
+ byte[] bytes = new byte[buffer.limit()];
+ buffer.get(bytes);
+
+ state.readMessage = null;
+ state.readLength = ByteBuffer.allocate(2);
+
+ registry.broadcast(new String(bytes, "UTF-8"));
+ }
+ }
+ }
+ }
+
+ public abstract void close();
+ }
+
+ /**
+ * Checks if there is a valid port at
+ * {@link Preferences#userNodeForPackage(Class)} for {@link Main} under the
+ * given applicationKey, tries to connect to the port at the loopback
+ * address and checks if the port identifies with the applicationKey.
+ *
+ * @param applicationKey
+ * key used to load the port and check the identity of the
+ * connection.
+ * @return
+ */
+ public static Optional getRemoteInstance(String applicationKey) {
+ Optional port = getSavedPort(applicationKey);
+
+ if (!port.isPresent()) {
+ return Optional.empty();
+ }
+
+ SocketChannel channel = null;
+ boolean close = true;
+ try {
+ channel = SocketChannel.open();
+ channel.configureBlocking(false);
+ LOG.info("connecting to instance {}", port.get());
+ channel.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), port.get()));
+
+ SocketChannel fChannel = channel;
+ if (!TimeoutTask.attempt(t -> fChannel.finishConnect(), 1000, 10)) {
+ return Optional.empty();
+ }
+
+ LOG.info("connected to instance {}", port.get());
+
+ final byte[] bytes = applicationKey.getBytes();
+ ByteBuffer buf = ByteBuffer.allocate(bytes.length);
+ tryFill(channel, buf, 1000);
+ if (buf.hasRemaining()) {
+ return Optional.empty();
+ }
+
+ buf.flip();
+
+ for (int i = 0; i < bytes.length; i++) {
+ if (buf.get() != bytes[i]) {
+ return Optional.empty();
+ }
+ }
+
+ close = false;
+ return Optional.of(new RemoteInstance(channel));
+ } catch (Exception e) {
+ return Optional.empty();
+ } finally {
+ try {
+ if (channel != null && close && channel.isOpen()) {
+ channel.close();
+ }
+ } catch (Exception e) {
+
+ }
+ }
+ }
+
+ static Optional getSavedPort(String applicationKey) {
+ int port = Preferences.userNodeForPackage(Main.class).getInt(applicationKey, -1);
+
+ if (port == -1) {
+ LOG.info("no running instance found");
+ return Optional.empty();
+ }
+
+ return Optional.of(port);
+ }
+
+ /**
+ * Creates a server socket on a free port and saves the port in
+ * {@link Preferences#userNodeForPackage(Class)} for {@link Main} under the
+ * given applicationKey.
+ *
+ * @param applicationKey
+ * key used to save the port and identify upon connection.
+ * @param exec
+ * the task which is submitted is interruptable.
+ * @return
+ * @throws IOException
+ */
+ @SuppressWarnings("resource")
+ public static LocalInstance startLocalInstance(String applicationKey, ExecutorService exec) throws IOException {
+ final ServerSocketChannel channel = ServerSocketChannel.open();
+ channel.configureBlocking(false);
+ channel.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
+
+ final int port = ((InetSocketAddress) channel.getLocalAddress()).getPort();
+ Preferences.userNodeForPackage(Main.class).putInt(applicationKey, port);
+ LOG.info("InstanceManager bound to port {}", port);
+
+ Selector selector = Selector.open();
+ channel.register(selector, SelectionKey.OP_ACCEPT);
+
+ LocalInstance instance = new LocalInstance(applicationKey) {
+ @Override
+ public void close() {
+ try {
+ selector.close();
+ } catch (IOException e) {
+ LOG.error("error closing socket", e);
+ }
+ try {
+ channel.close();
+ } catch (IOException e) {
+ LOG.error("error closing selector", e);
+ }
+ if (getSavedPort(applicationKey).orElse(-1).equals(port)) {
+ Preferences.userNodeForPackage(Main.class).remove(applicationKey);
+ }
+ }
+ };
+
+ exec.submit(() -> {
+ try {
+ final Set keysToRemove = new HashSet<>();
+ while (selector.select() > 0) {
+ final Set keys = selector.selectedKeys();
+ for (SelectionKey key : keys) {
+ if (Thread.interrupted()) {
+ return;
+ }
+ try {
+ // LOG.debug("selected {} {}", key.channel(),
+ // key.readyOps());
+ if (key.isAcceptable()) {
+ final SocketChannel accepted = channel.accept();
+ if (accepted != null) {
+ LOG.info("accepted incoming connection");
+ accepted.configureBlocking(false);
+ accepted.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+ }
+ }
+ instance.handle(key);
+ } catch (IOException | IllegalStateException e) {
+ LOG.error("exception in selector", e);
+ } finally {
+ keysToRemove.add(key);
+ }
+ }
+ keys.removeAll(keysToRemove);
+ }
+ } catch (ClosedSelectorException e) {
+ return;
+ } catch (Throwable e) {
+ LOG.error("error while selecting", e);
+ }
+ });
+
+ return instance;
+ }
+
+ /**
+ * tries to fill the given buffer for the given time
+ *
+ * @param channel
+ * @param buf
+ * @param timeout
+ * @throws ClosedChannelException
+ * @throws IOException
+ */
+ public static void tryFill(T channel, final ByteBuffer buf, int timeout) throws IOException {
+ if (channel.isBlocking()) {
+ throw new IllegalStateException("Channel is in blocking mode.");
+ }
+
+ try (Selector selector = Selector.open()) {
+ channel.register(selector, SelectionKey.OP_READ);
+
+ TimeoutTask.attempt(remainingTime -> {
+ if (!buf.hasRemaining()) {
+ return true;
+ }
+ if (selector.select(remainingTime) > 0) {
+ if (channel.read(buf) < 0) {
+ return true;
+ }
+ }
+ return !buf.hasRemaining();
+ }, timeout, 1);
+ }
+ }
+}
\ No newline at end of file
diff --git a/main/ui/src/main/java/org/cryptomator/ui/util/TimeoutTask.java b/main/ui/src/main/java/org/cryptomator/ui/util/TimeoutTask.java
new file mode 100644
index 000000000..db9c492d0
--- /dev/null
+++ b/main/ui/src/main/java/org/cryptomator/ui/util/TimeoutTask.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * Copyright (c) 2014 Sebastian Stenzel
+ * This file is licensed under the terms of the MIT license.
+ * See the LICENSE.txt file for more info.
+ *
+ * Contributors:
+ * Sebastian Stenzel - initial API and implementation
+ ******************************************************************************/
+package org.cryptomator.ui.util;
+
+/**
+ * A task which is supposed to be repeated until it succeeds.
+ *
+ * @author Tillmann Gaida
+ *
+ * @param
+ * The type of checked exception that this task may throw.
+ */
+public interface TimeoutTask {
+ /**
+ * Attempts to execute the task.
+ *
+ * @param timeout
+ * The time remaining to finish the task.
+ * @return true if the task finished, false if it needs to be attempted
+ * again.
+ * @throws E
+ * @throws InterruptedException
+ */
+ boolean attempt(long timeout) throws E, InterruptedException;
+
+ /**
+ * Attempts a task until a timeout occurs. Checks for this timeout are based
+ * on {@link System#currentTimeMillis()}, so they are very crude. The task
+ * is guaranteed to be attempted once.
+ *
+ * @param task
+ * the task to perform.
+ * @param timeout
+ * time in millis before this method stops attempting to finish
+ * the task. greater than zero.
+ * @param sleepTimes
+ * time in millis to sleep between attempts. greater than zero.
+ * @return true if the task was finished, false if the task never always
+ * returned false or as soon as the task throws an
+ * {@link InterruptedException}.
+ * @throws E
+ * From the task.
+ */
+ public static boolean attempt(TimeoutTask task, long timeout, long sleepTimes) throws E {
+ if (timeout <= 0 || sleepTimes <= 0) {
+ throw new IllegalArgumentException();
+ }
+
+ long currentTime = System.currentTimeMillis();
+
+ long tryUntil = currentTime + timeout;
+
+ for (;; currentTime = System.currentTimeMillis()) {
+ if (currentTime >= tryUntil) {
+ return false;
+ }
+
+ try {
+ if (task.attempt(tryUntil - currentTime)) {
+ return true;
+ }
+
+ currentTime = System.currentTimeMillis();
+
+ if (currentTime + sleepTimes < tryUntil) {
+ Thread.sleep(sleepTimes);
+ } else {
+ return false;
+ }
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/main/ui/src/test/java/org/cryptomator/ui/util/ListenerRegistryTest.java b/main/ui/src/test/java/org/cryptomator/ui/util/ListenerRegistryTest.java
new file mode 100644
index 000000000..1b04b4f36
--- /dev/null
+++ b/main/ui/src/test/java/org/cryptomator/ui/util/ListenerRegistryTest.java
@@ -0,0 +1,45 @@
+package org.cryptomator.ui.util;
+
+import static org.junit.Assert.*;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.junit.Test;
+
+public class ListenerRegistryTest {
+ /**
+ * This test looks at how concurrent modifications affect the iterator of a
+ * {@link ConcurrentSkipListMap}. It shows that concurrent modifications
+ * work just fine, however the state of the iterator including the next
+ * value are advanced during retrieval of a value, so it's not possible to
+ * remove the next value.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentSkipListMap() throws Exception {
+ ConcurrentSkipListMap map = new ConcurrentSkipListMap<>();
+
+ map.put(1, 1);
+ map.put(2, 2);
+ map.put(3, 3);
+ map.put(4, 4);
+ map.put(5, 5);
+
+ final Iterator iterator = map.values().iterator();
+
+ assertTrue(iterator.hasNext());
+ assertEquals((Integer) 1, iterator.next());
+ map.remove(2);
+ assertTrue(iterator.hasNext());
+ // iterator returns 2 anyway.
+ assertEquals((Integer) 2, iterator.next());
+ assertTrue(iterator.hasNext());
+ map.remove(4);
+ assertEquals((Integer) 3, iterator.next());
+ assertTrue(iterator.hasNext());
+ // this time we removed 4 before retrieving 3, so it is skipped.
+ assertEquals((Integer) 5, iterator.next());
+ }
+}
diff --git a/main/ui/src/test/java/org/cryptomator/ui/util/SingleInstanceManagerTest.java b/main/ui/src/test/java/org/cryptomator/ui/util/SingleInstanceManagerTest.java
new file mode 100644
index 000000000..acf6ce6a0
--- /dev/null
+++ b/main/ui/src/test/java/org/cryptomator/ui/util/SingleInstanceManagerTest.java
@@ -0,0 +1,193 @@
+package org.cryptomator.ui.util;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.TimeUnit;
+
+import org.cryptomator.ui.util.SingleInstanceManager.LocalInstance;
+import org.cryptomator.ui.util.SingleInstanceManager.MessageListener;
+import org.cryptomator.ui.util.SingleInstanceManager.RemoteInstance;
+import org.junit.Test;
+
+public class SingleInstanceManagerTest {
+ @Test(timeout = 1000)
+ public void testTryFillTimeout() throws Exception {
+ try (final ServerSocket socket = new ServerSocket(0)) {
+ // we need to asynchronously accept the connection
+ final ForkJoinTask> forked = ForkJoinTask.adapt(() -> {
+ try {
+ socket.setSoTimeout(1000);
+
+ socket.accept();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }).fork();
+
+ try (SocketChannel channel = SocketChannel.open()) {
+ channel.configureBlocking(false);
+ channel.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), socket.getLocalPort()));
+ TimeoutTask.attempt(t -> channel.finishConnect(), 100, 1);
+ final ByteBuffer buffer = ByteBuffer.allocate(1);
+ SingleInstanceManager.tryFill(channel, buffer, 100);
+ assertTrue(buffer.hasRemaining());
+ }
+
+ forked.join();
+ }
+ }
+
+ @Test(timeout = 1000)
+ public void testTryFill() throws Exception {
+ try (final ServerSocket socket = new ServerSocket(0)) {
+ // we need to asynchronously accept the connection
+ final ForkJoinTask> forked = ForkJoinTask.adapt(() -> {
+ try {
+ socket.setSoTimeout(1000);
+
+ socket.accept().getOutputStream().write(1);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }).fork();
+
+ try (SocketChannel channel = SocketChannel.open()) {
+ channel.configureBlocking(false);
+ channel.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), socket.getLocalPort()));
+ TimeoutTask.attempt(t -> channel.finishConnect(), 100, 1);
+ final ByteBuffer buffer = ByteBuffer.allocate(1);
+ SingleInstanceManager.tryFill(channel, buffer, 100);
+ assertFalse(buffer.hasRemaining());
+ }
+
+ forked.join();
+ }
+ }
+
+ String appKey = "APPKEY";
+
+ @Test
+ public void testOneMessage() throws Exception {
+ ExecutorService exec = Executors.newCachedThreadPool();
+
+ try {
+ final LocalInstance server = SingleInstanceManager.startLocalInstance(appKey, exec);
+ final Optional r = SingleInstanceManager.getRemoteInstance(appKey);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ final MessageListener listener = spy(new MessageListener() {
+ @Override
+ public void handleMessage(String message) {
+ latch.countDown();
+ }
+ });
+ server.registerListener(listener);
+
+ assertTrue(r.isPresent());
+
+ String message = "Is this thing on?";
+ assertTrue(r.get().sendMessage(message, 100));
+ System.out.println("wrote message");
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ verify(listener).handleMessage(message);
+ } finally {
+ exec.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testALotOfMessages() throws Exception {
+ final int connectors = 256;
+ final int messagesPerConnector = 256;
+
+ ExecutorService exec = Executors.newSingleThreadExecutor();
+ ExecutorService exec2 = Executors.newFixedThreadPool(16);
+
+ try (final LocalInstance server = SingleInstanceManager.startLocalInstance(appKey, exec)) {
+
+ Set sentMessages = new ConcurrentSkipListSet<>();
+ Set receivedMessages = new HashSet<>();
+
+ CountDownLatch sendLatch = new CountDownLatch(connectors);
+ CountDownLatch receiveLatch = new CountDownLatch(connectors * messagesPerConnector);
+
+ server.registerListener(message -> {
+ receivedMessages.add(message);
+ receiveLatch.countDown();
+ });
+
+ Set instances = Collections.synchronizedSet(new HashSet<>());
+
+ for (int i = 0; i < connectors; i++) {
+ exec2.submit(() -> {
+ try {
+ final Optional r = SingleInstanceManager.getRemoteInstance(appKey);
+ assertTrue(r.isPresent());
+ instances.add(r.get());
+
+ for (int j = 0; j < messagesPerConnector; j++) {
+ exec2.submit(() -> {
+ try {
+ for (;;) {
+ final String message = UUID.randomUUID().toString();
+ if (!sentMessages.add(message)) {
+ continue;
+ }
+ r.get().sendMessage(message, 100);
+ break;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ sendLatch.countDown();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ assertTrue(sendLatch.await(1, TimeUnit.MINUTES));
+
+ exec2.shutdown();
+ assertTrue(exec2.awaitTermination(1, TimeUnit.MINUTES));
+
+ assertTrue(receiveLatch.await(1, TimeUnit.MINUTES));
+
+ assertEquals(sentMessages, receivedMessages);
+
+ for (RemoteInstance remoteInstance : instances) {
+ try {
+ remoteInstance.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ } finally {
+ exec.shutdownNow();
+ exec2.shutdownNow();
+ }
+ }
+}