Merge pull request #22592 from azat/uncaught-exceptions

Fix some uncaught exceptions (in SCOPE_EXIT) under memory pressure
This commit is contained in:
alexey-milovidov 2021-04-07 07:17:26 +03:00 committed by GitHub
commit a7e216969a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 112 additions and 40 deletions

View File

@ -0,0 +1,66 @@
#pragma once
#include <ext/scope_guard.h>
#include <common/logger_useful.h>
#include <Common/MemoryTracker.h>
/// Same as SCOPE_EXIT() but block the MEMORY_LIMIT_EXCEEDED errors.
///
/// Typical example of SCOPE_EXIT_MEMORY() usage is when code under it may do
/// some tiny allocations, that may fail under high memory pressure or/and low
/// max_memory_usage (and related limits).
///
/// NOTE: it should be used with caution.
#define SCOPE_EXIT_MEMORY(...) SCOPE_EXIT( \
MemoryTracker::LockExceptionInThread lock_memory_tracker; \
__VA_ARGS__; \
)
/// Same as SCOPE_EXIT() but try/catch/tryLogCurrentException any exceptions.
///
/// SCOPE_EXIT_SAFE() should be used in case the exception during the code
/// under SCOPE_EXIT() is not "that fatal" and error message in log is enough.
///
/// Good example is calling CurrentThread::detachQueryIfNotDetached().
///
/// Anti-pattern is calling WriteBuffer::finalize() under SCOPE_EXIT_SAFE()
/// (since finalize() can do final write and it is better to fail abnormally
/// instead of ignoring write error).
///
/// NOTE: it should be used with double caution.
#define SCOPE_EXIT_SAFE(...) SCOPE_EXIT( \
try \
{ \
__VA_ARGS__; \
} \
catch (...) \
{ \
tryLogCurrentException(__PRETTY_FUNCTION__); \
} \
)
/// Same as SCOPE_EXIT() but:
/// - block the MEMORY_LIMIT_EXCEEDED errors,
/// - try/catch/tryLogCurrentException any exceptions.
///
/// SCOPE_EXIT_MEMORY_SAFE() can be used when the error can be ignored, and in
/// addition to SCOPE_EXIT_SAFE() it will also lock MEMORY_LIMIT_EXCEEDED to
/// avoid such exceptions.
///
/// It does exists as a separate helper, since you do not need to lock
/// MEMORY_LIMIT_EXCEEDED always (there are cases when code under SCOPE_EXIT does
/// not do any allocations, while LockExceptionInThread increment atomic
/// variable).
///
/// NOTE: it should be used with triple caution.
#define SCOPE_EXIT_MEMORY_SAFE(...) SCOPE_EXIT( \
try \
{ \
MemoryTracker::LockExceptionInThread lock_memory_tracker; \
__VA_ARGS__; \
} \
catch (...) \
{ \
tryLogCurrentException(__PRETTY_FUNCTION__); \
} \
)

View File

@ -21,7 +21,7 @@
#include <unordered_set> #include <unordered_set>
#include <algorithm> #include <algorithm>
#include <optional> #include <optional>
#include <ext/scope_guard.h> #include <ext/scope_guard_safe.h>
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
#include <boost/algorithm/string/replace.hpp> #include <boost/algorithm/string/replace.hpp>
#include <Poco/String.h> #include <Poco/String.h>
@ -1610,7 +1610,7 @@ private:
{ {
/// Temporarily apply query settings to context. /// Temporarily apply query settings to context.
std::optional<Settings> old_settings; std::optional<Settings> old_settings;
SCOPE_EXIT({ if (old_settings) context.setSettings(*old_settings); }); SCOPE_EXIT_SAFE({ if (old_settings) context.setSettings(*old_settings); });
auto apply_query_settings = [&](const IAST & settings_ast) auto apply_query_settings = [&](const IAST & settings_ast)
{ {
if (!old_settings) if (!old_settings)

View File

@ -3,6 +3,7 @@
#include <Common/TerminalSize.h> #include <Common/TerminalSize.h>
#include <IO/ConnectionTimeoutsContext.h> #include <IO/ConnectionTimeoutsContext.h>
#include <Formats/registerFormats.h> #include <Formats/registerFormats.h>
#include <ext/scope_guard_safe.h>
#include <unistd.h> #include <unistd.h>
@ -112,7 +113,7 @@ void ClusterCopierApp::mainImpl()
SharedContextHolder shared_context = Context::createShared(); SharedContextHolder shared_context = Context::createShared();
auto context = std::make_unique<Context>(Context::createGlobal(shared_context.get())); auto context = std::make_unique<Context>(Context::createGlobal(shared_context.get()));
context->makeGlobalContext(); context->makeGlobalContext();
SCOPE_EXIT(context->shutdown()); SCOPE_EXIT_SAFE(context->shutdown());
context->setConfig(loaded_config.configuration); context->setConfig(loaded_config.configuration);
context->setApplicationType(Context::ApplicationType::LOCAL); context->setApplicationType(Context::ApplicationType::LOCAL);

View File

@ -10,7 +10,7 @@
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <ext/scope_guard.h> #include <ext/scope_guard_safe.h>
#include <iomanip> #include <iomanip>
#include <Poco/File.h> #include <Poco/File.h>
@ -61,7 +61,7 @@ void DatabaseLazy::createTable(
const StoragePtr & table, const StoragePtr & table,
const ASTPtr & query) const ASTPtr & query)
{ {
SCOPE_EXIT({ clearExpiredTables(); }); SCOPE_EXIT_MEMORY_SAFE({ clearExpiredTables(); });
if (!endsWith(table->getName(), "Log")) if (!endsWith(table->getName(), "Log"))
throw Exception("Lazy engine can be used only with *Log tables.", ErrorCodes::UNSUPPORTED_METHOD); throw Exception("Lazy engine can be used only with *Log tables.", ErrorCodes::UNSUPPORTED_METHOD);
DatabaseOnDisk::createTable(context, table_name, table, query); DatabaseOnDisk::createTable(context, table_name, table, query);
@ -78,7 +78,7 @@ void DatabaseLazy::dropTable(
const String & table_name, const String & table_name,
bool no_delay) bool no_delay)
{ {
SCOPE_EXIT({ clearExpiredTables(); }); SCOPE_EXIT_MEMORY_SAFE({ clearExpiredTables(); });
DatabaseOnDisk::dropTable(context, table_name, no_delay); DatabaseOnDisk::dropTable(context, table_name, no_delay);
} }
@ -90,7 +90,7 @@ void DatabaseLazy::renameTable(
bool exchange, bool exchange,
bool dictionary) bool dictionary)
{ {
SCOPE_EXIT({ clearExpiredTables(); }); SCOPE_EXIT_MEMORY_SAFE({ clearExpiredTables(); });
DatabaseOnDisk::renameTable(context, table_name, to_database, to_table_name, exchange, dictionary); DatabaseOnDisk::renameTable(context, table_name, to_database, to_table_name, exchange, dictionary);
} }
@ -115,14 +115,14 @@ void DatabaseLazy::alterTable(
bool DatabaseLazy::isTableExist(const String & table_name) const bool DatabaseLazy::isTableExist(const String & table_name) const
{ {
SCOPE_EXIT({ clearExpiredTables(); }); SCOPE_EXIT_MEMORY_SAFE({ clearExpiredTables(); });
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
return tables_cache.find(table_name) != tables_cache.end(); return tables_cache.find(table_name) != tables_cache.end();
} }
StoragePtr DatabaseLazy::tryGetTable(const String & table_name) const StoragePtr DatabaseLazy::tryGetTable(const String & table_name) const
{ {
SCOPE_EXIT({ clearExpiredTables(); }); SCOPE_EXIT_MEMORY_SAFE({ clearExpiredTables(); });
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto it = tables_cache.find(table_name); auto it = tables_cache.find(table_name);
@ -224,7 +224,7 @@ DatabaseLazy::~DatabaseLazy()
StoragePtr DatabaseLazy::loadTable(const String & table_name) const StoragePtr DatabaseLazy::loadTable(const String & table_name) const
{ {
SCOPE_EXIT({ clearExpiredTables(); }); SCOPE_EXIT_MEMORY_SAFE({ clearExpiredTables(); });
LOG_DEBUG(log, "Load table {} to cache.", backQuote(table_name)); LOG_DEBUG(log, "Load table {} to cache.", backQuote(table_name));

View File

@ -11,7 +11,7 @@
#include <AggregateFunctions/parseAggregateFunctionParameters.h> #include <AggregateFunctions/parseAggregateFunctionParameters.h>
#include <Common/Arena.h> #include <Common/Arena.h>
#include <ext/scope_guard.h> #include <ext/scope_guard_safe.h>
namespace DB namespace DB
@ -172,7 +172,7 @@ ColumnPtr FunctionArrayReduce::executeImpl(const ColumnsWithTypeAndName & argume
} }
} }
SCOPE_EXIT({ SCOPE_EXIT_MEMORY_SAFE({
for (size_t i = 0; i < input_rows_count; ++i) for (size_t i = 0; i < input_rows_count; ++i)
agg_func.destroy(places[i]); agg_func.destroy(places[i]);
}); });

View File

@ -13,7 +13,7 @@
#include <AggregateFunctions/parseAggregateFunctionParameters.h> #include <AggregateFunctions/parseAggregateFunctionParameters.h>
#include <Common/Arena.h> #include <Common/Arena.h>
#include <ext/scope_guard.h> #include <ext/scope_guard_safe.h>
namespace DB namespace DB
@ -252,7 +252,7 @@ ColumnPtr FunctionArrayReduceInRanges::executeImpl(const ColumnsWithTypeAndName
} }
} }
SCOPE_EXIT({ SCOPE_EXIT_MEMORY_SAFE({
for (size_t j = 0; j < place_total; ++j) for (size_t j = 0; j < place_total; ++j)
agg_func.destroy(places[j]); agg_func.destroy(places[j]);
}); });
@ -331,7 +331,7 @@ ColumnPtr FunctionArrayReduceInRanges::executeImpl(const ColumnsWithTypeAndName
AggregateDataPtr place = arena->alignedAlloc(agg_func.sizeOfData(), agg_func.alignOfData()); AggregateDataPtr place = arena->alignedAlloc(agg_func.sizeOfData(), agg_func.alignOfData());
agg_func.create(place); agg_func.create(place);
SCOPE_EXIT({ SCOPE_EXIT_MEMORY_SAFE({
agg_func.destroy(place); agg_func.destroy(place);
}); });

View File

@ -9,7 +9,7 @@
#include <AggregateFunctions/parseAggregateFunctionParameters.h> #include <AggregateFunctions/parseAggregateFunctionParameters.h>
#include <Common/Arena.h> #include <Common/Arena.h>
#include <ext/scope_guard.h> #include <ext/scope_guard_safe.h>
namespace DB namespace DB
@ -132,7 +132,7 @@ ColumnPtr FunctionInitializeAggregation::executeImpl(const ColumnsWithTypeAndNam
} }
} }
SCOPE_EXIT({ SCOPE_EXIT_MEMORY_SAFE({
for (size_t i = 0; i < input_rows_count; ++i) for (size_t i = 0; i < input_rows_count; ++i)
agg_func.destroy(places[i]); agg_func.destroy(places[i]);
}); });

View File

@ -5,7 +5,7 @@
#include <DataTypes/DataTypeAggregateFunction.h> #include <DataTypes/DataTypeAggregateFunction.h>
#include <Common/AlignedBuffer.h> #include <Common/AlignedBuffer.h>
#include <Common/Arena.h> #include <Common/Arena.h>
#include <ext/scope_guard.h> #include <ext/scope_guard_safe.h>
namespace DB namespace DB
@ -104,7 +104,7 @@ public:
const auto & states = column_with_states->getData(); const auto & states = column_with_states->getData();
bool state_created = false; bool state_created = false;
SCOPE_EXIT({ SCOPE_EXIT_MEMORY_SAFE({
if (state_created) if (state_created)
agg_func.destroy(place.data()); agg_func.destroy(place.data());
}); });

View File

@ -29,6 +29,7 @@
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <random> #include <random>
#include <pcg_random.hpp> #include <pcg_random.hpp>
#include <ext/scope_guard_safe.h>
namespace fs = std::filesystem; namespace fs = std::filesystem;
@ -820,7 +821,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
zookeeper->set(tries_to_execute_path, toString(counter + 1)); zookeeper->set(tries_to_execute_path, toString(counter + 1));
task.ops.push_back(create_shard_flag); task.ops.push_back(create_shard_flag);
SCOPE_EXIT({ if (!executed_by_us && !task.ops.empty()) task.ops.pop_back(); }); SCOPE_EXIT_MEMORY({ if (!executed_by_us && !task.ops.empty()) task.ops.pop_back(); });
/// If the leader will unexpectedly changed this method will return false /// If the leader will unexpectedly changed this method will return false
/// and on the next iteration new leader will take lock /// and on the next iteration new leader will take lock

View File

@ -82,7 +82,7 @@
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/checkStackSize.h> #include <Common/checkStackSize.h>
#include <ext/map.h> #include <ext/map.h>
#include <ext/scope_guard.h> #include <ext/scope_guard_safe.h>
#include <memory> #include <memory>
@ -1401,7 +1401,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
AggregateDataPtr place = state.data(); AggregateDataPtr place = state.data();
agg_count.create(place); agg_count.create(place);
SCOPE_EXIT(agg_count.destroy(place)); SCOPE_EXIT_MEMORY_SAFE(agg_count.destroy(place));
agg_count.set(place, *num_rows); agg_count.set(place, *num_rows);

View File

@ -316,6 +316,8 @@ void ThreadStatus::finalizeQueryProfiler()
void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits) void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
{ {
MemoryTracker::LockExceptionInThread lock;
if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery) if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery)
{ {
thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery; thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery;

View File

@ -1,14 +1,15 @@
#include <Processors/Executors/PipelineExecutor.h>
#include <queue> #include <queue>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <Processors/printPipeline.h>
#include <Common/EventCounter.h> #include <Common/EventCounter.h>
#include <ext/scope_guard.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Processors/ISource.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Common/MemoryTracker.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/printPipeline.h>
#include <Processors/ISource.h>
#include <Interpreters/ProcessList.h> #include <Interpreters/ProcessList.h>
#include <Interpreters/OpenTelemetrySpanLog.h> #include <Interpreters/OpenTelemetrySpanLog.h>
#include <ext/scope_guard_safe.h>
#ifndef NDEBUG #ifndef NDEBUG
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
@ -740,7 +741,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
bool finished_flag = false; bool finished_flag = false;
SCOPE_EXIT( SCOPE_EXIT_SAFE(
if (!finished_flag) if (!finished_flag)
{ {
finish(); finish();
@ -766,7 +767,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
if (thread_group) if (thread_group)
CurrentThread::attachTo(thread_group); CurrentThread::attachTo(thread_group);
SCOPE_EXIT( SCOPE_EXIT_SAFE(
if (thread_group) if (thread_group)
CurrentThread::detachQueryIfNotDetached(); CurrentThread::detachQueryIfNotDetached();
); );

View File

@ -5,7 +5,7 @@
#include <Processors/QueryPipeline.h> #include <Processors/QueryPipeline.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <ext/scope_guard.h> #include <ext/scope_guard_safe.h>
namespace DB namespace DB
{ {
@ -72,7 +72,7 @@ static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGrou
if (thread_group) if (thread_group)
CurrentThread::attachTo(thread_group); CurrentThread::attachTo(thread_group);
SCOPE_EXIT( SCOPE_EXIT_SAFE(
if (thread_group) if (thread_group)
CurrentThread::detachQueryIfNotDetached(); CurrentThread::detachQueryIfNotDetached();
); );

View File

@ -2,14 +2,14 @@
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <ext/scope_guard.h> #include <ext/scope_guard_safe.h>
namespace DB namespace DB
{ {
void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr thread_group) void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr thread_group)
{ {
SCOPE_EXIT( SCOPE_EXIT_SAFE(
if (thread_group) if (thread_group)
CurrentThread::detachQueryIfNotDetached(); CurrentThread::detachQueryIfNotDetached();
); );
@ -60,7 +60,7 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr
void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number) void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number)
{ {
SCOPE_EXIT( SCOPE_EXIT_SAFE(
if (thread_group) if (thread_group)
CurrentThread::detachQueryIfNotDetached(); CurrentThread::detachQueryIfNotDetached();
); );

View File

@ -1,5 +1,5 @@
#include <boost/rational.hpp> /// For calculations related to sampling coefficients. #include <boost/rational.hpp> /// For calculations related to sampling coefficients.
#include <ext/scope_guard.h> #include <ext/scope_guard_safe.h>
#include <optional> #include <optional>
#include <unordered_set> #include <unordered_set>
@ -704,7 +704,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
for (size_t part_index = 0; part_index < parts.size(); ++part_index) for (size_t part_index = 0; part_index < parts.size(); ++part_index)
pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()] { pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()] {
SCOPE_EXIT( SCOPE_EXIT_SAFE(
if (thread_group) if (thread_group)
CurrentThread::detachQueryIfNotDetached(); CurrentThread::detachQueryIfNotDetached();
); );

View File

@ -26,6 +26,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h> #include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h>
#include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h> #include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h>
#include <Storages/VirtualColumnUtils.h> #include <Storages/VirtualColumnUtils.h>
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>
@ -59,7 +60,7 @@
#include <ext/range.h> #include <ext/range.h>
#include <ext/scope_guard.h> #include <ext/scope_guard.h>
#include "Storages/MergeTree/MergeTreeReaderCompact.h" #include <ext/scope_guard_safe.h>
#include <ctime> #include <ctime>
#include <thread> #include <thread>
@ -3693,7 +3694,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
} }
} }
SCOPE_EXIT SCOPE_EXIT_MEMORY
({ ({
std::lock_guard lock(currently_fetching_parts_mutex); std::lock_guard lock(currently_fetching_parts_mutex);
currently_fetching_parts.erase(part_name); currently_fetching_parts.erase(part_name);
@ -3901,7 +3902,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const
} }
} }
SCOPE_EXIT SCOPE_EXIT_MEMORY
({ ({
std::lock_guard lock(currently_fetching_parts_mutex); std::lock_guard lock(currently_fetching_parts_mutex);
currently_fetching_parts.erase(part_name); currently_fetching_parts.erase(part_name);