Fix problem with use-after-free inside shared_ptr

This commit is contained in:
Ivan Lezhankin 2021-04-21 16:16:16 +03:00
parent ee194928d2
commit 484528ba2c
7 changed files with 15 additions and 14 deletions

View File

@ -847,10 +847,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setDefaultProfiles(config());
const Settings & settings = global_context->getSettingsRef();
global_context->setAsynchronousInsertQueue(std::make_shared<AsynchronousInsertQueue>(
settings.async_insert_threads,
settings.async_insert_max_data_size,
AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout, .stale = settings.async_insert_stale_timeout}));
if (settings.async_insert_threads)
global_context->setAsynchronousInsertQueue(std::make_shared<AsynchronousInsertQueue>(
settings.async_insert_threads,
settings.async_insert_max_data_size,
AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout, .stale = settings.async_insert_stale_timeout}));
/// Size of cache for marks (index of MergeTree family of tables). It is mandatory.
size_t mark_cache_size = config().getUInt64("mark_cache_size");

View File

@ -541,7 +541,7 @@ class IColumn;
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \
M(Bool, insert_distributed_one_random_shard, false, "If setting is enabled, inserting into distributed table will choose a random shard to write when there is no sharding key", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background", 0) \
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert_mode, false, "Insert query is processed almost instantly, but an actual data queued for later asynchronous insertion", 0) \
M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(Seconds, async_insert_busy_timeout, 1, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \

View File

@ -78,6 +78,8 @@ AsynchronousInsertQueue::AsynchronousInsertQueue(size_t pool_size, size_t max_da
{
using namespace std::chrono;
assert(pool_size);
if (stale_timeout > 0s)
dump_by_last_update_thread = ThreadFromGlobalPool(&AsynchronousInsertQueue::staleCheck, this);
}

View File

@ -2645,10 +2645,9 @@ PartUUIDsPtr Context::getIgnoredPartUUIDs()
return ignored_part_uuids;
}
AsynchronousInsertQueue & Context::getAsynchronousInsertQueue() const
AsynchronousInsertQueue * Context::getAsynchronousInsertQueue() const
{
/// Assume that dereference of shared_ptr will assert on uninitialized object.
return *shared->async_insert_queue;
return shared->async_insert_queue.get();
}
void Context::setAsynchronousInsertQueue(const std::shared_ptr<AsynchronousInsertQueue> & ptr)

View File

@ -776,7 +776,7 @@ public:
PartUUIDsPtr getPartUUIDs();
PartUUIDsPtr getIgnoredPartUUIDs();
AsynchronousInsertQueue & getAsynchronousInsertQueue() const;
AsynchronousInsertQueue * getAsynchronousInsertQueue() const;
void setAsynchronousInsertQueue(const std::shared_ptr<AsynchronousInsertQueue> & ptr);
ReadTaskCallback getReadTaskCallback() const;

View File

@ -382,7 +382,6 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
query_id.clear();
query_context.reset();
thread_group->query_context.reset();
thread_trace_context.trace_id = 0;
thread_trace_context.span_id = 0;
thread_group.reset();

View File

@ -564,11 +564,11 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
}
auto * queue = context->getAsynchronousInsertQueue();
const bool async_insert
= insert_query && !insert_query->select && (insert_query->data || insert_query->tail) && settings.async_insert_mode;
auto & queue = context->getAsynchronousInsertQueue();
= queue && insert_query && !insert_query->select && (insert_query->data || insert_query->tail) && settings.async_insert_mode;
if (async_insert && queue.push(insert_query, settings))
if (async_insert && queue->push(insert_query, settings))
{
/// Shortcut for already processed similar insert-queries.
/// Similarity is defined by hashing query text and some settings.
@ -911,7 +911,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (async_insert)
{
queue.push(insert_query, std::move(res), settings);
queue->push(insert_query, std::move(res), settings);
return std::make_tuple(ast, BlockIO());
}
else if (insert_query && res.in)