From 484528ba2c9f09601f80375fb76967b888430aba Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Wed, 21 Apr 2021 16:16:16 +0300 Subject: [PATCH] Fix problem with use-after-free inside shared_ptr --- programs/server/Server.cpp | 9 +++++---- src/Core/Settings.h | 2 +- src/IO/AsynchronousInsertionQueue.cpp | 2 ++ src/Interpreters/Context.cpp | 5 ++--- src/Interpreters/Context.h | 2 +- src/Interpreters/ThreadStatusExt.cpp | 1 - src/Interpreters/executeQuery.cpp | 8 ++++---- 7 files changed, 15 insertions(+), 14 deletions(-) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index d89341a7057..df9154c2c89 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -847,10 +847,11 @@ int Server::main(const std::vector & /*args*/) global_context->setDefaultProfiles(config()); const Settings & settings = global_context->getSettingsRef(); - global_context->setAsynchronousInsertQueue(std::make_shared( - settings.async_insert_threads, - settings.async_insert_max_data_size, - AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout, .stale = settings.async_insert_stale_timeout})); + if (settings.async_insert_threads) + global_context->setAsynchronousInsertQueue(std::make_shared( + settings.async_insert_threads, + settings.async_insert_max_data_size, + AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout, .stale = settings.async_insert_stale_timeout})); /// Size of cache for marks (index of MergeTree family of tables). It is mandatory. size_t mark_cache_size = config().getUInt64("mark_cache_size"); diff --git a/src/Core/Settings.h b/src/Core/Settings.h index dd92106a824..3f7545673e9 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -541,7 +541,7 @@ class IColumn; M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \ M(Bool, insert_distributed_one_random_shard, false, "If setting is enabled, inserting into distributed table will choose a random shard to write when there is no sharding key", 0) \ \ - M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background", 0) \ + M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \ M(Bool, async_insert_mode, false, "Insert query is processed almost instantly, but an actual data queued for later asynchronous insertion", 0) \ M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \ M(Seconds, async_insert_busy_timeout, 1, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \ diff --git a/src/IO/AsynchronousInsertionQueue.cpp b/src/IO/AsynchronousInsertionQueue.cpp index 332042961bc..72793132b1d 100644 --- a/src/IO/AsynchronousInsertionQueue.cpp +++ b/src/IO/AsynchronousInsertionQueue.cpp @@ -78,6 +78,8 @@ AsynchronousInsertQueue::AsynchronousInsertQueue(size_t pool_size, size_t max_da { using namespace std::chrono; + assert(pool_size); + if (stale_timeout > 0s) dump_by_last_update_thread = ThreadFromGlobalPool(&AsynchronousInsertQueue::staleCheck, this); } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 1bbbd2301e0..21b836a0a91 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2645,10 +2645,9 @@ PartUUIDsPtr Context::getIgnoredPartUUIDs() return ignored_part_uuids; } -AsynchronousInsertQueue & Context::getAsynchronousInsertQueue() const +AsynchronousInsertQueue * Context::getAsynchronousInsertQueue() const { - /// Assume that dereference of shared_ptr will assert on uninitialized object. - return *shared->async_insert_queue; + return shared->async_insert_queue.get(); } void Context::setAsynchronousInsertQueue(const std::shared_ptr & ptr) diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 1ddabccded7..944c9ed3bd3 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -776,7 +776,7 @@ public: PartUUIDsPtr getPartUUIDs(); PartUUIDsPtr getIgnoredPartUUIDs(); - AsynchronousInsertQueue & getAsynchronousInsertQueue() const; + AsynchronousInsertQueue * getAsynchronousInsertQueue() const; void setAsynchronousInsertQueue(const std::shared_ptr & ptr); ReadTaskCallback getReadTaskCallback() const; diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index d0a225a9db7..c04534e11a1 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -382,7 +382,6 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits) query_id.clear(); query_context.reset(); - thread_group->query_context.reset(); thread_trace_context.trace_id = 0; thread_trace_context.span_id = 0; thread_group.reset(); diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 0ccb914ea20..6f49f219b5e 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -564,11 +564,11 @@ static std::tuple executeQueryImpl( limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); } + auto * queue = context->getAsynchronousInsertQueue(); const bool async_insert - = insert_query && !insert_query->select && (insert_query->data || insert_query->tail) && settings.async_insert_mode; - auto & queue = context->getAsynchronousInsertQueue(); + = queue && insert_query && !insert_query->select && (insert_query->data || insert_query->tail) && settings.async_insert_mode; - if (async_insert && queue.push(insert_query, settings)) + if (async_insert && queue->push(insert_query, settings)) { /// Shortcut for already processed similar insert-queries. /// Similarity is defined by hashing query text and some settings. @@ -911,7 +911,7 @@ static std::tuple executeQueryImpl( if (async_insert) { - queue.push(insert_query, std::move(res), settings); + queue->push(insert_query, std::move(res), settings); return std::make_tuple(ast, BlockIO()); } else if (insert_query && res.in)