From b25a4e066c983439f5b00f1fa55a357c20651d27 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sun, 4 Apr 2021 12:23:04 +0300 Subject: [PATCH 1/3] Lock MEMORY_LIMIT_EXCEEDED in ThreadStatus::detachQuery() Found with fuzzer [1]: BaseDaemon: (version 21.5.1.6440, build id: 3B097C902DDAA35688D90750552ED499DC5D10A0) (from thread 8012) Terminate called for uncaught exception: Code: 241, e.displayText() = DB::Exception: Memory limit (for user) exceeded: would use 153.51 MiB (attempt to allocate chunk of 4194368 bytes), maximum: 150.00 MiB, Stack trace (when copying this message, always include the lines below): 0. ./obj-x86_64-linux-gnu/../contrib/libcxx/include/exception:133: Poco::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int) @ 0x26fdcdd9 in /usr/bin/clickhouse 1. ./obj-x86_64-linux-gnu/../src/Common/Exception.cpp:57: DB::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int, bool) @ 0xad0df02 in /usr/bin/clickhouse 2. ./obj-x86_64-linux-gnu/../contrib/libcxx/include/string:1444: DB::Exception::Exception, std::__1::allocator >, long&, std::__1::basic_string, std::__1::allocator > >(int, std::__1::basic_string, std::__1::allocator > const&, char const*&&, char const*&&, std::__1::basic_string, std::__1::allocator >&&, long&, std::__1::basic_string, std::__1::allocator >&&) @ 0xacc7bef in /usr/bin/clickhouse 3. ./obj-x86_64-linux-gnu/../src/Common/MemoryTracker.cpp:219: MemoryTracker::alloc(long) @ 0xacc65eb in /usr/bin/clickhouse 4. ./obj-x86_64-linux-gnu/../src/Common/MemoryTracker.cpp:0: MemoryTracker::alloc(long) @ 0xacc5dad in /usr/bin/clickhouse 5. ./obj-x86_64-linux-gnu/../src/Common/MemoryTracker.cpp:0: MemoryTracker::alloc(long) @ 0xacc5dad in /usr/bin/clickhouse 6. ./obj-x86_64-linux-gnu/../src/Common/AllocatorWithMemoryTracking.h:35: AllocatorWithMemoryTracking::allocate(unsigned long) @ 0xad0a2fe in /usr/bin/clickhouse 7. void std::__1::vector >::__push_back_slow_path(DB::Field&&) @ 0x11712a51 in /usr/bin/clickhouse 8. ./obj-x86_64-linux-gnu/../src/Interpreters/ThreadStatusExt.cpp:356: DB::ThreadStatus::detachQuery(bool, bool) @ 0x1f5d5237 in /usr/bin/clickhouse 9. ./obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:0: void std::__1::__function::__policy_invoker::__call_impl(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'(), void ()> >(std::__1::__function::__policy_storage const*) @ 0x20c488e6 in /usr/bin/clickhouse 10. ./obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:0: ThreadPoolImpl::worker(std::__1::__list_iterator) @ 0xad9f6cc in /usr/bin/clickhouse 11. ./obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:1655: void* std::__1::__thread_proxy >, void ThreadPoolImpl::scheduleImpl(std::__1::function, int, std::__1::optional)::'lambda1'()> >(void*) @ 0xada8264 in /usr/bin/clickhouse 12. start_thread @ 0x9609 in /usr/lib/x86_64-linux-gnu/libpthread-2.31.so 13. __clone @ 0x122293 in /usr/lib/x86_64-linux-gnu/libc-2.31.so (version 21.5.1.6440) [1]: https://clickhouse-test-reports.s3.yandex.net/22583/69296876005c0fa171c755f8b224e4d58192c402/stress_test_(address).html#fail1 --- src/Interpreters/ThreadStatusExt.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index 8a979721290..c45902ae497 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -316,6 +316,8 @@ void ThreadStatus::finalizeQueryProfiler() void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits) { + MemoryTracker::LockExceptionInThread lock; + if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery) { thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery; From 9394e6e5edc92a1de19957fabdc26a5f9ad94e27 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sun, 4 Apr 2021 12:45:30 +0300 Subject: [PATCH 2/3] Add SCOPE_EXIT_SAFE/SCOPE_EXIT_MEMORY_SAFE helpers --- base/ext/scope_guard_safe.h | 66 +++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 base/ext/scope_guard_safe.h diff --git a/base/ext/scope_guard_safe.h b/base/ext/scope_guard_safe.h new file mode 100644 index 00000000000..7cfb3959a81 --- /dev/null +++ b/base/ext/scope_guard_safe.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include + +/// Same as SCOPE_EXIT() but block the MEMORY_LIMIT_EXCEEDED errors. +/// +/// Typical example of SCOPE_EXIT_MEMORY() usage is when code under it may do +/// some tiny allocations, that may fail under high memory pressure or/and low +/// max_memory_usage (and related limits). +/// +/// NOTE: it should be used with caution. +#define SCOPE_EXIT_MEMORY(...) SCOPE_EXIT( \ + MemoryTracker::LockExceptionInThread lock_memory_tracker; \ + __VA_ARGS__; \ +) + +/// Same as SCOPE_EXIT() but try/catch/tryLogCurrentException any exceptions. +/// +/// SCOPE_EXIT_SAFE() should be used in case the exception during the code +/// under SCOPE_EXIT() is not "that fatal" and error message in log is enough. +/// +/// Good example is calling CurrentThread::detachQueryIfNotDetached(). +/// +/// Anti-pattern is calling WriteBuffer::finalize() under SCOPE_EXIT_SAFE() +/// (since finalize() can do final write and it is better to fail abnormally +/// instead of ignoring write error). +/// +/// NOTE: it should be used with double caution. +#define SCOPE_EXIT_SAFE(...) SCOPE_EXIT( \ + try \ + { \ + __VA_ARGS__; \ + } \ + catch (...) \ + { \ + tryLogCurrentException(__PRETTY_FUNCTION__); \ + } \ +) + +/// Same as SCOPE_EXIT() but: +/// - block the MEMORY_LIMIT_EXCEEDED errors, +/// - try/catch/tryLogCurrentException any exceptions. +/// +/// SCOPE_EXIT_MEMORY_SAFE() can be used when the error can be ignored, and in +/// addition to SCOPE_EXIT_SAFE() it will also lock MEMORY_LIMIT_EXCEEDED to +/// avoid such exceptions. +/// +/// It does exists as a separate helper, since you do not need to lock +/// MEMORY_LIMIT_EXCEEDED always (there are cases when code under SCOPE_EXIT does +/// not do any allocations, while LockExceptionInThread increment atomic +/// variable). +/// +/// NOTE: it should be used with triple caution. +#define SCOPE_EXIT_MEMORY_SAFE(...) SCOPE_EXIT( \ + try \ + { \ + MemoryTracker::LockExceptionInThread lock_memory_tracker; \ + __VA_ARGS__; \ + } \ + catch (...) \ + { \ + tryLogCurrentException(__PRETTY_FUNCTION__); \ + } \ +) From f157278b729d3db6de33d7c0510690a1d2468956 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sun, 4 Apr 2021 12:23:40 +0300 Subject: [PATCH 3/3] Safer SCOPE_EXIT It executes the code in the dtor, that should never throw. --- programs/client/Client.cpp | 4 ++-- programs/copier/ClusterCopierApp.cpp | 3 ++- src/Databases/DatabaseLazy.cpp | 14 +++++++------- src/Functions/array/arrayReduce.cpp | 4 ++-- src/Functions/array/arrayReduceInRanges.cpp | 6 +++--- src/Functions/initializeAggregation.cpp | 4 ++-- src/Functions/runningAccumulate.cpp | 4 ++-- src/Interpreters/DDLWorker.cpp | 3 ++- src/Interpreters/InterpreterSelectQuery.cpp | 4 ++-- src/Processors/Executors/PipelineExecutor.cpp | 17 +++++++++-------- .../Executors/PullingAsyncPipelineExecutor.cpp | 4 ++-- .../Formats/Impl/ParallelParsingInputFormat.cpp | 6 +++--- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 4 ++-- src/Storages/StorageReplicatedMergeTree.cpp | 7 ++++--- 14 files changed, 44 insertions(+), 40 deletions(-) diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 0c5bbaf3edd..555b1adc414 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -21,7 +21,7 @@ #include #include #include -#include +#include #include #include #include @@ -1610,7 +1610,7 @@ private: { /// Temporarily apply query settings to context. std::optional old_settings; - SCOPE_EXIT({ if (old_settings) context.setSettings(*old_settings); }); + SCOPE_EXIT_SAFE({ if (old_settings) context.setSettings(*old_settings); }); auto apply_query_settings = [&](const IAST & settings_ast) { if (!old_settings) diff --git a/programs/copier/ClusterCopierApp.cpp b/programs/copier/ClusterCopierApp.cpp index e3169a49ecf..7dfadc87716 100644 --- a/programs/copier/ClusterCopierApp.cpp +++ b/programs/copier/ClusterCopierApp.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -112,7 +113,7 @@ void ClusterCopierApp::mainImpl() SharedContextHolder shared_context = Context::createShared(); auto context = std::make_unique(Context::createGlobal(shared_context.get())); context->makeGlobalContext(); - SCOPE_EXIT(context->shutdown()); + SCOPE_EXIT_SAFE(context->shutdown()); context->setConfig(loaded_config.configuration); context->setApplicationType(Context::ApplicationType::LOCAL); diff --git a/src/Databases/DatabaseLazy.cpp b/src/Databases/DatabaseLazy.cpp index f297bf2c82f..b1bc55bf58d 100644 --- a/src/Databases/DatabaseLazy.cpp +++ b/src/Databases/DatabaseLazy.cpp @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include @@ -61,7 +61,7 @@ void DatabaseLazy::createTable( const StoragePtr & table, const ASTPtr & query) { - SCOPE_EXIT({ clearExpiredTables(); }); + SCOPE_EXIT_MEMORY_SAFE({ clearExpiredTables(); }); if (!endsWith(table->getName(), "Log")) throw Exception("Lazy engine can be used only with *Log tables.", ErrorCodes::UNSUPPORTED_METHOD); DatabaseOnDisk::createTable(context, table_name, table, query); @@ -78,7 +78,7 @@ void DatabaseLazy::dropTable( const String & table_name, bool no_delay) { - SCOPE_EXIT({ clearExpiredTables(); }); + SCOPE_EXIT_MEMORY_SAFE({ clearExpiredTables(); }); DatabaseOnDisk::dropTable(context, table_name, no_delay); } @@ -90,7 +90,7 @@ void DatabaseLazy::renameTable( bool exchange, bool dictionary) { - SCOPE_EXIT({ clearExpiredTables(); }); + SCOPE_EXIT_MEMORY_SAFE({ clearExpiredTables(); }); DatabaseOnDisk::renameTable(context, table_name, to_database, to_table_name, exchange, dictionary); } @@ -115,14 +115,14 @@ void DatabaseLazy::alterTable( bool DatabaseLazy::isTableExist(const String & table_name) const { - SCOPE_EXIT({ clearExpiredTables(); }); + SCOPE_EXIT_MEMORY_SAFE({ clearExpiredTables(); }); std::lock_guard lock(mutex); return tables_cache.find(table_name) != tables_cache.end(); } StoragePtr DatabaseLazy::tryGetTable(const String & table_name) const { - SCOPE_EXIT({ clearExpiredTables(); }); + SCOPE_EXIT_MEMORY_SAFE({ clearExpiredTables(); }); { std::lock_guard lock(mutex); auto it = tables_cache.find(table_name); @@ -224,7 +224,7 @@ DatabaseLazy::~DatabaseLazy() StoragePtr DatabaseLazy::loadTable(const String & table_name) const { - SCOPE_EXIT({ clearExpiredTables(); }); + SCOPE_EXIT_MEMORY_SAFE({ clearExpiredTables(); }); LOG_DEBUG(log, "Load table {} to cache.", backQuote(table_name)); diff --git a/src/Functions/array/arrayReduce.cpp b/src/Functions/array/arrayReduce.cpp index 342a4b18854..0611a4f5d0c 100644 --- a/src/Functions/array/arrayReduce.cpp +++ b/src/Functions/array/arrayReduce.cpp @@ -11,7 +11,7 @@ #include #include -#include +#include namespace DB @@ -172,7 +172,7 @@ ColumnPtr FunctionArrayReduce::executeImpl(const ColumnsWithTypeAndName & argume } } - SCOPE_EXIT({ + SCOPE_EXIT_MEMORY_SAFE({ for (size_t i = 0; i < input_rows_count; ++i) agg_func.destroy(places[i]); }); diff --git a/src/Functions/array/arrayReduceInRanges.cpp b/src/Functions/array/arrayReduceInRanges.cpp index 9839d2a8fe7..55e4d81f36c 100644 --- a/src/Functions/array/arrayReduceInRanges.cpp +++ b/src/Functions/array/arrayReduceInRanges.cpp @@ -13,7 +13,7 @@ #include #include -#include +#include namespace DB @@ -252,7 +252,7 @@ ColumnPtr FunctionArrayReduceInRanges::executeImpl(const ColumnsWithTypeAndName } } - SCOPE_EXIT({ + SCOPE_EXIT_MEMORY_SAFE({ for (size_t j = 0; j < place_total; ++j) agg_func.destroy(places[j]); }); @@ -331,7 +331,7 @@ ColumnPtr FunctionArrayReduceInRanges::executeImpl(const ColumnsWithTypeAndName AggregateDataPtr place = arena->alignedAlloc(agg_func.sizeOfData(), agg_func.alignOfData()); agg_func.create(place); - SCOPE_EXIT({ + SCOPE_EXIT_MEMORY_SAFE({ agg_func.destroy(place); }); diff --git a/src/Functions/initializeAggregation.cpp b/src/Functions/initializeAggregation.cpp index 4b90b7956c5..76a885fd730 100644 --- a/src/Functions/initializeAggregation.cpp +++ b/src/Functions/initializeAggregation.cpp @@ -9,7 +9,7 @@ #include #include -#include +#include namespace DB @@ -132,7 +132,7 @@ ColumnPtr FunctionInitializeAggregation::executeImpl(const ColumnsWithTypeAndNam } } - SCOPE_EXIT({ + SCOPE_EXIT_MEMORY_SAFE({ for (size_t i = 0; i < input_rows_count; ++i) agg_func.destroy(places[i]); }); diff --git a/src/Functions/runningAccumulate.cpp b/src/Functions/runningAccumulate.cpp index 1a01b88e22c..f3f142bb846 100644 --- a/src/Functions/runningAccumulate.cpp +++ b/src/Functions/runningAccumulate.cpp @@ -5,7 +5,7 @@ #include #include #include -#include +#include namespace DB @@ -104,7 +104,7 @@ public: const auto & states = column_with_states->getData(); bool state_created = false; - SCOPE_EXIT({ + SCOPE_EXIT_MEMORY_SAFE({ if (state_created) agg_func.destroy(place.data()); }); diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index eceb48ae773..9021f48057e 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -29,6 +29,7 @@ #include #include #include +#include namespace fs = std::filesystem; @@ -820,7 +821,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica( zookeeper->set(tries_to_execute_path, toString(counter + 1)); task.ops.push_back(create_shard_flag); - SCOPE_EXIT({ if (!executed_by_us && !task.ops.empty()) task.ops.pop_back(); }); + SCOPE_EXIT_MEMORY({ if (!executed_by_us && !task.ops.empty()) task.ops.pop_back(); }); /// If the leader will unexpectedly changed this method will return false /// and on the next iteration new leader will take lock diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 1f6b0c37437..1f440aef817 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -82,7 +82,7 @@ #include #include #include -#include +#include #include @@ -1401,7 +1401,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc AggregateDataPtr place = state.data(); agg_count.create(place); - SCOPE_EXIT(agg_count.destroy(place)); + SCOPE_EXIT_MEMORY_SAFE(agg_count.destroy(place)); agg_count.set(place, *num_rows); diff --git a/src/Processors/Executors/PipelineExecutor.cpp b/src/Processors/Executors/PipelineExecutor.cpp index a724f22ed31..b1751dfd030 100644 --- a/src/Processors/Executors/PipelineExecutor.cpp +++ b/src/Processors/Executors/PipelineExecutor.cpp @@ -1,14 +1,15 @@ -#include #include #include -#include #include -#include #include -#include #include +#include +#include +#include +#include #include #include +#include #ifndef NDEBUG #include @@ -740,7 +741,7 @@ void PipelineExecutor::executeImpl(size_t num_threads) bool finished_flag = false; - SCOPE_EXIT( + SCOPE_EXIT_SAFE( if (!finished_flag) { finish(); @@ -766,9 +767,9 @@ void PipelineExecutor::executeImpl(size_t num_threads) if (thread_group) CurrentThread::attachTo(thread_group); - SCOPE_EXIT( - if (thread_group) - CurrentThread::detachQueryIfNotDetached(); + SCOPE_EXIT_SAFE( + if (thread_group) + CurrentThread::detachQueryIfNotDetached(); ); try diff --git a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp index f1626414375..9f1999bc4a3 100644 --- a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp +++ b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp @@ -5,7 +5,7 @@ #include #include -#include +#include namespace DB { @@ -72,7 +72,7 @@ static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGrou if (thread_group) CurrentThread::attachTo(thread_group); - SCOPE_EXIT( + SCOPE_EXIT_SAFE( if (thread_group) CurrentThread::detachQueryIfNotDetached(); ); diff --git a/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp index 1ad913a1a59..f295fe00299 100644 --- a/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp @@ -2,14 +2,14 @@ #include #include #include -#include +#include namespace DB { void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr thread_group) { - SCOPE_EXIT( + SCOPE_EXIT_SAFE( if (thread_group) CurrentThread::detachQueryIfNotDetached(); ); @@ -60,7 +60,7 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number) { - SCOPE_EXIT( + SCOPE_EXIT_SAFE( if (thread_group) CurrentThread::detachQueryIfNotDetached(); ); diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index f3759107912..742ebafaf5c 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1,5 +1,5 @@ #include /// For calculations related to sampling coefficients. -#include +#include #include #include @@ -704,7 +704,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( for (size_t part_index = 0; part_index < parts.size(); ++part_index) pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()] { - SCOPE_EXIT( + SCOPE_EXIT_SAFE( if (thread_group) CurrentThread::detachQueryIfNotDetached(); ); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 1cc7c7299fa..73f0a7907e5 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include @@ -59,7 +60,7 @@ #include #include -#include "Storages/MergeTree/MergeTreeReaderCompact.h" +#include #include #include @@ -3693,7 +3694,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora } } - SCOPE_EXIT + SCOPE_EXIT_MEMORY ({ std::lock_guard lock(currently_fetching_parts_mutex); currently_fetching_parts.erase(part_name); @@ -3901,7 +3902,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const } } - SCOPE_EXIT + SCOPE_EXIT_MEMORY ({ std::lock_guard lock(currently_fetching_parts_mutex); currently_fetching_parts.erase(part_name);