This commit is contained in:
aditya.chandel
2024-12-18 17:41:30 -07:00
parent de8846ea8e
commit 52f2f327c7
3 changed files with 58 additions and 11 deletions

View File

@@ -27,4 +27,7 @@ public class Library {
@OneToMany(mappedBy = "library", cascade = CascadeType.ALL, orphanRemoval = true)
private List<Book> books;
@Column(name = "initial_processed")
private boolean initialProcessed;
}

View File

@@ -14,9 +14,11 @@ import com.adityachandel.booklore.transformer.BookTransformer;
import com.adityachandel.booklore.transformer.LibraryTransformer;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.catalina.connector.ClientAbortException;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.AsyncRequestNotUsableException;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
@@ -26,6 +28,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Service
@@ -73,26 +76,66 @@ public class LibraryService {
return bookPage.map(BookTransformer::convertToBookDTO);
}
public SseEmitter parseLibraryBooks(long libraryId, boolean force) {
Library library = libraryRepository.findById(libraryId).orElseThrow(() -> ApiError.LIBRARY_NOT_FOUND.createException(libraryId));
if(library.isInitialProcessed()) {
SseEmitter emitter = new SseEmitter();
emitter.complete();
return emitter;
}
SseEmitter emitter = new SseEmitter();
ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
final AtomicBoolean isCompleted = new AtomicBoolean(false);
emitter.onTimeout(() -> {
log.info("Emitter timeout reached. Stopping further processing.");
emitter.complete();
isCompleted.set(true);
});
emitter.onCompletion(() -> {
log.info("Emitter completed. Shutting down executor.");
isCompleted.set(true);
sseMvcExecutor.shutdown();
});
sseMvcExecutor.execute(() -> {
try {
List<LibraryFile> libraryFiles = getLibraryFiles(library);
for (LibraryFile libraryFile : libraryFiles) {
log.info(libraryFile.getFilePath());
FileProcessResult fileProcessResult = processLibraryFile(libraryFile);
ParseLibraryEvent event = createParseLibraryEvent(libraryId, fileProcessResult);
emitter.send(event);
try {
log.info("Processing file: {}", libraryFile.getFilePath());
FileProcessResult fileProcessResult = processLibraryFile(libraryFile);
ParseLibraryEvent event = createParseLibraryEvent(libraryId, fileProcessResult);
if (!isCompleted.get()) {
emitter.send(event);
}
Thread.sleep(500);
} catch (AsyncRequestNotUsableException | IllegalStateException e) {
log.warn("Client disconnected! Continue processing.");
} catch (IOException e) {
log.warn("Client disconnected or failed to send response: {}", libraryFile.getFilePath(), e);
emitter.completeWithError(e);
sseMvcExecutor.shutdown();
return;
} catch (Exception e) {
log.error("Error processing file: {}", libraryFile.getFilePath(), e);
emitter.completeWithError(e);
sseMvcExecutor.shutdown();
return;
}
}
log.info("Finished processing library files");
library.setInitialProcessed(true);
libraryRepository.save(library);
emitter.complete();
log.info("emitter.complete()");
sseMvcExecutor.shutdown();
} catch (Exception ex) {
emitter.completeWithError(ex);
log.error("Error during file processing: ", ex);
if (!isCompleted.get()) {
emitter.completeWithError(ex);
sseMvcExecutor.shutdown();
}
} finally {
log.info("sseMvcExecutor.shutdown()");
log.info("File processing complete. Executor shutting down.");
emitter.complete();
sseMvcExecutor.shutdown();
}
});

View File

@@ -1,8 +1,9 @@
CREATE TABLE IF NOT EXISTS library
(
id BIGINT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) UNIQUE NOT NULL,
paths TEXT
id BIGINT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) UNIQUE NOT NULL,
paths TEXT,
initial_processed BOOLEAN DEFAULT false
);
CREATE TABLE IF NOT EXISTS book