From d1a35d5b285443aef1d981df450083ac4010ee11 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Fri, 6 Dec 2024 13:17:58 +0100 Subject: [PATCH 1/2] Properly cancel inserts in DistributedAsyncInsertDirectoryQueue --- .../DistributedAsyncInsertDirectoryQueue.cpp | 12 ++++++------ .../DistributedAsyncInsertDirectoryQueue.h | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp index 377919366b5..993f9e229e5 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp +++ b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp @@ -169,7 +169,7 @@ void DistributedAsyncInsertDirectoryQueue::flushAllData(const SettingsChanges & std::lock_guard lock{mutex}; if (!hasPendingFiles()) return; - processFiles(settings_changes); + processFiles(settings_changes, /*force=*/true); } void DistributedAsyncInsertDirectoryQueue::shutdownAndDropAllData() @@ -380,18 +380,18 @@ void DistributedAsyncInsertDirectoryQueue::initializeFilesFromDisk() status.broken_bytes_count = broken_bytes_count; } } -void DistributedAsyncInsertDirectoryQueue::processFiles(const SettingsChanges & settings_changes) +void DistributedAsyncInsertDirectoryQueue::processFiles(const SettingsChanges & settings_changes, bool force) try { if (should_batch_inserts) - processFilesWithBatching(settings_changes); + processFilesWithBatching(settings_changes, force); else { /// Process unprocessed file. if (!current_file.empty()) processFile(current_file, settings_changes); - while (!pending_files.isFinished() && pending_files.tryPop(current_file)) + while ((force || !monitor_blocker.isCancelled()) && !pending_files.isFinished() && pending_files.tryPop(current_file)) processFile(current_file, settings_changes); } @@ -539,7 +539,7 @@ DistributedAsyncInsertDirectoryQueue::Status DistributedAsyncInsertDirectoryQueu return current_status; } -void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching(const SettingsChanges & settings_changes) +void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching(const SettingsChanges & settings_changes, bool force) { /// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch. if (fs::exists(current_batch_file_path)) @@ -569,7 +569,7 @@ void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching(const Settin try { - while (pending_files.tryPop(file_path)) + while ((force || !monitor_blocker.isCancelled()) && !pending_files.isFinished() && pending_files.tryPop(file_path)) { if (!fs::exists(file_path)) { diff --git a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h index 972e6b3143a..6fcbaf9f117 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h +++ b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h @@ -100,9 +100,9 @@ private: void addFile(const std::string & file_path); void initializeFilesFromDisk(); - void processFiles(const SettingsChanges & settings_changes = {}); + void processFiles(const SettingsChanges & settings_changes = {}, bool force = false); void processFile(std::string & file_path, const SettingsChanges & settings_changes); - void processFilesWithBatching(const SettingsChanges & settings_changes); + void processFilesWithBatching(const SettingsChanges & settings_changes, bool force); void markAsBroken(const std::string & file_path); void markAsSend(const std::string & file_path); From 6c6e1118c5601e3f54232908ba24baf6424995ed Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Fri, 6 Dec 2024 13:43:30 +0100 Subject: [PATCH 2/2] Better --- .../DistributedAsyncInsertDirectoryQueue.cpp | 10 +++++----- .../Distributed/DistributedAsyncInsertDirectoryQueue.h | 5 +++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp index 993f9e229e5..480efd11ba1 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp +++ b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp @@ -169,7 +169,7 @@ void DistributedAsyncInsertDirectoryQueue::flushAllData(const SettingsChanges & std::lock_guard lock{mutex}; if (!hasPendingFiles()) return; - processFiles(settings_changes, /*force=*/true); + processFiles(/*force=*/true, settings_changes); } void DistributedAsyncInsertDirectoryQueue::shutdownAndDropAllData() @@ -212,7 +212,7 @@ void DistributedAsyncInsertDirectoryQueue::run() { try { - processFiles(); + processFiles(/*force=*/false); /// No errors while processing existing files. /// Let's see maybe there are more files to process. do_sleep = false; @@ -380,11 +380,11 @@ void DistributedAsyncInsertDirectoryQueue::initializeFilesFromDisk() status.broken_bytes_count = broken_bytes_count; } } -void DistributedAsyncInsertDirectoryQueue::processFiles(const SettingsChanges & settings_changes, bool force) +void DistributedAsyncInsertDirectoryQueue::processFiles(bool force, const SettingsChanges & settings_changes) try { if (should_batch_inserts) - processFilesWithBatching(settings_changes, force); + processFilesWithBatching(force, settings_changes); else { /// Process unprocessed file. @@ -539,7 +539,7 @@ DistributedAsyncInsertDirectoryQueue::Status DistributedAsyncInsertDirectoryQueu return current_status; } -void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching(const SettingsChanges & settings_changes, bool force) +void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching(bool force, const SettingsChanges & settings_changes) { /// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch. if (fs::exists(current_batch_file_path)) diff --git a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h index 6fcbaf9f117..023edb01f5a 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h +++ b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h @@ -100,9 +100,10 @@ private: void addFile(const std::string & file_path); void initializeFilesFromDisk(); - void processFiles(const SettingsChanges & settings_changes = {}, bool force = false); + /// Set `force = true` if processing of files must be finished fully despite cancellation flag being set + void processFiles(bool force, const SettingsChanges & settings_changes = {}); void processFile(std::string & file_path, const SettingsChanges & settings_changes); - void processFilesWithBatching(const SettingsChanges & settings_changes, bool force); + void processFilesWithBatching(bool force, const SettingsChanges & settings_changes); void markAsBroken(const std::string & file_path); void markAsSend(const std::string & file_path);