From 5139067631fe29adf3f2c8178ed50dffc74b54c0 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 5 Aug 2021 21:04:11 +0300 Subject: [PATCH 1/5] Guard BackgroundJobsExecutor from thread termination in case of uncaught exception --- src/Storages/MergeTree/BackgroundJobsExecutor.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp index 36803ba5197..e4b96e55c87 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp @@ -180,10 +180,16 @@ void IBackgroundJobExecutor::triggerTask() } void IBackgroundJobExecutor::backgroundTaskFunction() +try { if (!scheduleJob()) scheduleTask(/* with_backoff = */ true); } +catch (...) /// Catch any exception to avoid thread termination. +{ + tryLogCurrentException(__PRETTY_FUNCTION__); + scheduleTask(/* with_backoff = */ true); +} IBackgroundJobExecutor::~IBackgroundJobExecutor() { From 7964355ecf162a34311070a455b9732a2ae7cf77 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 5 Aug 2021 21:04:11 +0300 Subject: [PATCH 2/5] Fix mutation stuck on invalid partitions in non-replicated MergeTree v2: Do not try to process empty mutations Found with flaky check [1]. [1]: https://clickhouse-test-reports.s3.yandex.net/27248/66e8c0833392c20ba8dba3780f2b0d5c18f8194e/functional_stateless_tests_flaky_check_(address).html#fail1 --- src/Storages/StorageMergeTree.cpp | 40 ++++++++++++++----- ...invalid_partition_mutation_stuck.reference | 0 ...02004_invalid_partition_mutation_stuck.sql | 33 +++++++++++++++ 3 files changed, 62 insertions(+), 11 deletions(-) create mode 100644 tests/queries/0_stateless/02004_invalid_partition_mutation_stuck.reference create mode 100644 tests/queries/0_stateless/02004_invalid_partition_mutation_stuck.sql diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 4f0046eecba..6142115cbc8 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -958,9 +958,19 @@ std::shared_ptr StorageMergeTree::se if (!commands_for_size_validation.empty()) { - MutationsInterpreter interpreter( - shared_from_this(), metadata_snapshot, commands_for_size_validation, getContext(), false); - commands_size += interpreter.evaluateCommandsSize(); + try + { + MutationsInterpreter interpreter( + shared_from_this(), metadata_snapshot, commands_for_size_validation, getContext(), false); + commands_size += interpreter.evaluateCommandsSize(); + } + catch (...) + { + MergeTreeMutationEntry & entry = it->second; + entry.latest_fail_time = time(nullptr); + entry.latest_fail_reason = getCurrentExceptionMessage(false); + continue; + } } if (current_ast_elements + commands_size >= max_ast_elements) @@ -970,17 +980,25 @@ std::shared_ptr StorageMergeTree::se commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end()); } - auto new_part_info = part->info; - new_part_info.mutation = current_mutations_by_version.rbegin()->first; + if (!commands.empty()) + { + auto new_part_info = part->info; + new_part_info.mutation = current_mutations_by_version.rbegin()->first; - future_part.parts.push_back(part); - future_part.part_info = new_part_info; - future_part.name = part->getNewName(new_part_info); - future_part.type = part->getType(); + future_part.parts.push_back(part); + future_part.part_info = new_part_info; + future_part.name = part->getNewName(new_part_info); + future_part.type = part->getType(); - tagger = std::make_unique(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true); - return std::make_shared(future_part, std::move(tagger), commands); + tagger = std::make_unique(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true); + return std::make_shared(future_part, std::move(tagger), commands); + } } + + /// Notify in case of errors + std::unique_lock lock(mutation_wait_mutex); + mutation_wait_event.notify_all(); + return {}; } diff --git a/tests/queries/0_stateless/02004_invalid_partition_mutation_stuck.reference b/tests/queries/0_stateless/02004_invalid_partition_mutation_stuck.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/02004_invalid_partition_mutation_stuck.sql b/tests/queries/0_stateless/02004_invalid_partition_mutation_stuck.sql new file mode 100644 index 00000000000..481a5565095 --- /dev/null +++ b/tests/queries/0_stateless/02004_invalid_partition_mutation_stuck.sql @@ -0,0 +1,33 @@ +SET mutations_sync=2; + +DROP TABLE IF EXISTS rep_data; +CREATE TABLE rep_data +( + p Int, + t DateTime, + INDEX idx t TYPE minmax GRANULARITY 1 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/rep_data', '1') +PARTITION BY p +ORDER BY t +SETTINGS number_of_free_entries_in_pool_to_execute_mutation=0; +INSERT INTO rep_data VALUES (1, now()); +ALTER TABLE rep_data MATERIALIZE INDEX idx IN PARTITION ID 'NO_SUCH_PART'; -- { serverError 248 } +ALTER TABLE rep_data MATERIALIZE INDEX idx IN PARTITION ID '1'; +ALTER TABLE rep_data MATERIALIZE INDEX idx IN PARTITION ID '2'; + +DROP TABLE IF EXISTS data; +CREATE TABLE data +( + p Int, + t DateTime, + INDEX idx t TYPE minmax GRANULARITY 1 +) +ENGINE = MergeTree +PARTITION BY p +ORDER BY t +SETTINGS number_of_free_entries_in_pool_to_execute_mutation=0; +INSERT INTO data VALUES (1, now()); +ALTER TABLE data MATERIALIZE INDEX idx IN PARTITION ID 'NO_SUCH_PART'; -- { serverError 341 } +ALTER TABLE data MATERIALIZE INDEX idx IN PARTITION ID '1'; +ALTER TABLE data MATERIALIZE INDEX idx IN PARTITION ID '2'; From 91d7f3daa7ac8ceb9c38767ceeda909bd5388548 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 6 Aug 2021 01:04:02 +0300 Subject: [PATCH 3/5] Fix lock-order-inversion while notifying about finished mutations TSAN reports [1]: WARNING: ThreadSanitizer: lock-order-inversion (potential deadlock) (pid=36) Cycle in lock order graph: M16388 (0x7b7400011d68) => M1030334152907497744 (0x000000000000) => M16388 Mutex M1030334152907497744 acquired here while holding mutex M16388 in thread T4: 0 pthread_mutex_lock (clickhouse+0x967d536) 1 std::__1::__libcpp_mutex_lock(pthread_mutex_t*) obj-x86_64-linux-gnu/../contrib/libcxx/include/__threading_support:405:10 (clickhouse+0x1b25c7d9) 2 std::__1::mutex::lock() obj-x86_64-linux-gnu/../contrib/libcxx/src/mutex.cpp:33:14 (clickhouse+0x1b25c7d9) 3 std::__1::lock_guard::lock_guard(std::__1::mutex&) obj-x86_64-linux-gnu/../contrib/libcxx/include/__mutex_base:91:27 (clickhouse+0x15220cd9) 4 DB::StorageMergeTree::getIncompleteMutationsStatus(long, std::__1::set, std::__1::allocator >, std::__1::less, std::__1::allocator > >, std:> 5 DB::StorageMergeTree::waitForMutation(long, std::__1::basic_string, std::__1::allocator > const&)::$_0::operator()() const obj-x86_64-linux-gnu/../src/Storages/StorageMergeTree.cpp:464:36 (clickhouse+0x1521e2b1) 6 void std::__1::condition_variable::wait, std::__1::allocator > const&)::$_0>(std::__1::unique_lock&, DB::StorageMergeTree::waitForMutation(lon> 7 DB::StorageMergeTree::waitForMutation(long, std::__1::basic_string, std::__1::allocator > const&) obj-x86_64-linux-gnu/../src/Storages/StorageMergeTree.cpp:469:29 (clickhouse+0x1521e2b1) 8 DB::StorageMergeTree::mutate(DB::MutationCommands const&, std::__1::shared_ptr) obj-x86_64-linux-gnu/../src/Storages/StorageMergeTree.cpp:496:9 (clickhouse+0x15221738) 9 DB::InterpreterAlterQuery::execute() obj-x86_64-linux-gnu/../src/Interpreters/InterpreterAlterQuery.cpp:113:16 (clickhouse+0x141182f6) 10 DB::executeQueryImpl(char const*, char const*, std::__1::shared_ptr, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer*) obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:560:32 (clickhouse+0x149152f6) 11 DB::executeQuery(std::__1::basic_string, std::__1::allocator > const&, std::__1::shared_ptr, bool, DB::QueryProcessingStage::Enum, bool) obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:909:30 (clickhous> 12 DB::TCPHandler::runImpl() obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:313:24 (clickhouse+0x153270af) Hint: use TSAN_OPTIONS=second_deadlock_stack=1 to get more informative warning message Mutex M16388 acquired here while holding mutex M1030334152907497744 in thread T59: 0 pthread_mutex_lock (clickhouse+0x967d536) 1 std::__1::__libcpp_mutex_lock(pthread_mutex_t*) obj-x86_64-linux-gnu/../contrib/libcxx/include/__threading_support:405:10 (clickhouse+0x1b25c7d9) 2 std::__1::mutex::lock() obj-x86_64-linux-gnu/../contrib/libcxx/src/mutex.cpp:33:14 (clickhouse+0x1b25c7d9) 3 std::__1::unique_lock::unique_lock(std::__1::mutex&) obj-x86_64-linux-gnu/../contrib/libcxx/include/__mutex_base:119:61 (clickhouse+0x15226c31) 4 DB::StorageMergeTree::selectPartsToMutate(std::__1::shared_ptr const&, std::__1::basic_string, std::__1::allocator >*, std::__1::shared_ptr&) obj-x86_64-linux-> 5 DB::StorageMergeTree::scheduleDataProcessingJob(DB::IBackgroundJobExecutor&) obj-x86_64-linux-gnu/../src/Storages/StorageMergeTree.cpp:1060:28 (clickhouse+0x15228b10) 6 DB::BackgroundJobsExecutor::scheduleJob() obj-x86_64-linux-gnu/../src/Storages/MergeTree/BackgroundJobsExecutor.cpp:229:17 (clickhouse+0x14f441dc) 7 DB::IBackgroundJobExecutor::backgroundTaskFunction() obj-x86_64-linux-gnu/../src/Storages/MergeTree/BackgroundJobsExecutor.cpp:185:10 (clickhouse+0x14f438ed) [1]: https://clickhouse-test-reports.s3.yandex.net/27248/4f3b80ff33c846465983aa2bc9ae9490e1118b15/fuzzer_tsan/report.htmlfail1 --- src/Storages/StorageMergeTree.cpp | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 6142115cbc8..01dc3b17fc3 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -995,10 +995,6 @@ std::shared_ptr StorageMergeTree::se } } - /// Notify in case of errors - std::unique_lock lock(mutation_wait_mutex); - mutation_wait_event.notify_all(); - return {}; } @@ -1053,6 +1049,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(IBackgroundJobExecutor & execut auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); + bool has_mutations; { std::unique_lock lock(currently_processing_in_background_mutex); if (merger_mutator.merges_blocker.isCancelled()) @@ -1061,6 +1058,15 @@ bool StorageMergeTree::scheduleDataProcessingJob(IBackgroundJobExecutor & execut merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, share_lock, lock); if (!merge_entry) mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock); + + has_mutations = !current_mutations_by_version.empty(); + } + + if (!mutate_entry && has_mutations) + { + /// Notify in case of errors + std::lock_guard lock(mutation_wait_mutex); + mutation_wait_event.notify_all(); } if (merge_entry) From a0a5c0da3256f4e81e2ee19ea1c9c774b638cb5c Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Wed, 11 Aug 2021 06:08:30 +0300 Subject: [PATCH 4/5] Update BackgroundJobsExecutor.cpp --- src/Storages/MergeTree/BackgroundJobsExecutor.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp index e4b96e55c87..9ff7dd60266 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp @@ -146,6 +146,9 @@ try catch (...) /// Exception while we looking for a task, reschedule { tryLogCurrentException(__PRETTY_FUNCTION__); + + /// Why do we scheduleTask again? + /// To retry on exception, since it may be some temporary exception. scheduleTask(/* with_backoff = */ true); } From 08f8511854fcf4600607f403ca7117d4ce63f7b4 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 11 Aug 2021 06:52:28 +0300 Subject: [PATCH 5/5] Fix Style --- src/Storages/MergeTree/BackgroundJobsExecutor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp index 9ff7dd60266..f3d957117e8 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp @@ -146,7 +146,7 @@ try catch (...) /// Exception while we looking for a task, reschedule { tryLogCurrentException(__PRETTY_FUNCTION__); - + /// Why do we scheduleTask again? /// To retry on exception, since it may be some temporary exception. scheduleTask(/* with_backoff = */ true);