diff --git a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp index 377919366b5..993f9e229e5 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp +++ b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp @@ -169,7 +169,7 @@ void DistributedAsyncInsertDirectoryQueue::flushAllData(const SettingsChanges & std::lock_guard lock{mutex}; if (!hasPendingFiles()) return; - processFiles(settings_changes); + processFiles(settings_changes, /*force=*/true); } void DistributedAsyncInsertDirectoryQueue::shutdownAndDropAllData() @@ -380,18 +380,18 @@ void DistributedAsyncInsertDirectoryQueue::initializeFilesFromDisk() status.broken_bytes_count = broken_bytes_count; } } -void DistributedAsyncInsertDirectoryQueue::processFiles(const SettingsChanges & settings_changes) +void DistributedAsyncInsertDirectoryQueue::processFiles(const SettingsChanges & settings_changes, bool force) try { if (should_batch_inserts) - processFilesWithBatching(settings_changes); + processFilesWithBatching(settings_changes, force); else { /// Process unprocessed file. if (!current_file.empty()) processFile(current_file, settings_changes); - while (!pending_files.isFinished() && pending_files.tryPop(current_file)) + while ((force || !monitor_blocker.isCancelled()) && !pending_files.isFinished() && pending_files.tryPop(current_file)) processFile(current_file, settings_changes); } @@ -539,7 +539,7 @@ DistributedAsyncInsertDirectoryQueue::Status DistributedAsyncInsertDirectoryQueu return current_status; } -void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching(const SettingsChanges & settings_changes) +void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching(const SettingsChanges & settings_changes, bool force) { /// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch. if (fs::exists(current_batch_file_path)) @@ -569,7 +569,7 @@ void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching(const Settin try { - while (pending_files.tryPop(file_path)) + while ((force || !monitor_blocker.isCancelled()) && !pending_files.isFinished() && pending_files.tryPop(file_path)) { if (!fs::exists(file_path)) { diff --git a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h index 972e6b3143a..6fcbaf9f117 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h +++ b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h @@ -100,9 +100,9 @@ private: void addFile(const std::string & file_path); void initializeFilesFromDisk(); - void processFiles(const SettingsChanges & settings_changes = {}); + void processFiles(const SettingsChanges & settings_changes = {}, bool force = false); void processFile(std::string & file_path, const SettingsChanges & settings_changes); - void processFilesWithBatching(const SettingsChanges & settings_changes); + void processFilesWithBatching(const SettingsChanges & settings_changes, bool force); void markAsBroken(const std::string & file_path); void markAsSend(const std::string & file_path);