Merge pull request #72885 from ClickHouse/cancel-sends-distributed-insert-queue

Properly cancel inserts in DistributedAsyncInsertDirectoryQueue
This commit is contained in:
Antonio Andelic 2024-12-06 18:37:00 +00:00 committed by GitHub
commit bfe3908d49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 10 additions and 9 deletions

View File

@ -169,7 +169,7 @@ void DistributedAsyncInsertDirectoryQueue::flushAllData(const SettingsChanges &
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};
if (!hasPendingFiles()) if (!hasPendingFiles())
return; return;
processFiles(settings_changes); processFiles(/*force=*/true, settings_changes);
} }
void DistributedAsyncInsertDirectoryQueue::shutdownAndDropAllData() void DistributedAsyncInsertDirectoryQueue::shutdownAndDropAllData()
@ -212,7 +212,7 @@ void DistributedAsyncInsertDirectoryQueue::run()
{ {
try try
{ {
processFiles(); processFiles(/*force=*/false);
/// No errors while processing existing files. /// No errors while processing existing files.
/// Let's see maybe there are more files to process. /// Let's see maybe there are more files to process.
do_sleep = false; do_sleep = false;
@ -380,18 +380,18 @@ void DistributedAsyncInsertDirectoryQueue::initializeFilesFromDisk()
status.broken_bytes_count = broken_bytes_count; status.broken_bytes_count = broken_bytes_count;
} }
} }
void DistributedAsyncInsertDirectoryQueue::processFiles(const SettingsChanges & settings_changes) void DistributedAsyncInsertDirectoryQueue::processFiles(bool force, const SettingsChanges & settings_changes)
try try
{ {
if (should_batch_inserts) if (should_batch_inserts)
processFilesWithBatching(settings_changes); processFilesWithBatching(force, settings_changes);
else else
{ {
/// Process unprocessed file. /// Process unprocessed file.
if (!current_file.empty()) if (!current_file.empty())
processFile(current_file, settings_changes); processFile(current_file, settings_changes);
while (!pending_files.isFinished() && pending_files.tryPop(current_file)) while ((force || !monitor_blocker.isCancelled()) && !pending_files.isFinished() && pending_files.tryPop(current_file))
processFile(current_file, settings_changes); processFile(current_file, settings_changes);
} }
@ -539,7 +539,7 @@ DistributedAsyncInsertDirectoryQueue::Status DistributedAsyncInsertDirectoryQueu
return current_status; return current_status;
} }
void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching(const SettingsChanges & settings_changes) void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching(bool force, const SettingsChanges & settings_changes)
{ {
/// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch. /// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
if (fs::exists(current_batch_file_path)) if (fs::exists(current_batch_file_path))
@ -569,7 +569,7 @@ void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching(const Settin
try try
{ {
while (pending_files.tryPop(file_path)) while ((force || !monitor_blocker.isCancelled()) && !pending_files.isFinished() && pending_files.tryPop(file_path))
{ {
if (!fs::exists(file_path)) if (!fs::exists(file_path))
{ {

View File

@ -100,9 +100,10 @@ private:
void addFile(const std::string & file_path); void addFile(const std::string & file_path);
void initializeFilesFromDisk(); void initializeFilesFromDisk();
void processFiles(const SettingsChanges & settings_changes = {}); /// Set `force = true` if processing of files must be finished fully despite cancellation flag being set
void processFiles(bool force, const SettingsChanges & settings_changes = {});
void processFile(std::string & file_path, const SettingsChanges & settings_changes); void processFile(std::string & file_path, const SettingsChanges & settings_changes);
void processFilesWithBatching(const SettingsChanges & settings_changes); void processFilesWithBatching(bool force, const SettingsChanges & settings_changes);
void markAsBroken(const std::string & file_path); void markAsBroken(const std::string & file_path);
void markAsSend(const std::string & file_path); void markAsSend(const std::string & file_path);