mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 19:12:03 +00:00
fix uaf of async_insert_queue
This commit is contained in:
parent
5d7e5e5a72
commit
1913466da4
@ -218,7 +218,7 @@ AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t poo
|
||||
dump_by_first_update_threads.emplace_back([this, i] { processBatchDeadlines(i); });
|
||||
}
|
||||
|
||||
AsynchronousInsertQueue::~AsynchronousInsertQueue()
|
||||
void AsynchronousInsertQueue::flushAndShutdown()
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -257,6 +257,19 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
|
||||
}
|
||||
}
|
||||
|
||||
AsynchronousInsertQueue::~AsynchronousInsertQueue()
|
||||
{
|
||||
for (const auto & shard : queue_shards)
|
||||
{
|
||||
for (const auto & [first_update, elem] : shard.queue)
|
||||
{
|
||||
const auto & insert_query = elem.key.query->as<const ASTInsertQuery &>();
|
||||
LOG_WARNING(log, "Has unprocessed async insert for {}.{}",
|
||||
backQuoteIfNeed(insert_query.getDatabase()), backQuoteIfNeed(insert_query.getTable()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void AsynchronousInsertQueue::scheduleDataProcessingJob(
|
||||
const InsertQuery & key, InsertDataPtr data, ContextPtr global_context, size_t shard_num)
|
||||
{
|
||||
|
@ -63,6 +63,8 @@ public:
|
||||
PushResult pushQueryWithBlock(ASTPtr query, Block block, ContextPtr query_context);
|
||||
size_t getPoolSize() const { return pool_size; }
|
||||
|
||||
void flushAndShutdown();
|
||||
|
||||
private:
|
||||
|
||||
struct InsertQuery
|
||||
|
@ -566,7 +566,7 @@ struct ContextSharedPart : boost::noncopyable
|
||||
std::lock_guard lock(mutex);
|
||||
delete_async_insert_queue = std::move(async_insert_queue);
|
||||
}
|
||||
delete_async_insert_queue.reset();
|
||||
delete_async_insert_queue->flushAndShutdown();
|
||||
|
||||
/// Stop periodic reloading of the configuration files.
|
||||
/// This must be done first because otherwise the reloading may pass a changed config
|
||||
@ -590,6 +590,8 @@ struct ContextSharedPart : boost::noncopyable
|
||||
LOG_TRACE(log, "Shutting down database catalog");
|
||||
DatabaseCatalog::shutdown();
|
||||
|
||||
delete_async_insert_queue.reset();
|
||||
|
||||
SHUTDOWN(log, "merges executor", merge_mutate_executor, wait());
|
||||
SHUTDOWN(log, "fetches executor", fetch_executor, wait());
|
||||
SHUTDOWN(log, "moves executor", moves_executor, wait());
|
||||
@ -4990,7 +4992,7 @@ PartUUIDsPtr Context::getIgnoredPartUUIDs() const
|
||||
|
||||
AsynchronousInsertQueue * Context::tryGetAsynchronousInsertQueue() const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
SharedLockGuard lock(shared->mutex);
|
||||
return shared->async_insert_queue.get();
|
||||
}
|
||||
|
||||
@ -4998,7 +5000,7 @@ void Context::setAsynchronousInsertQueue(const std::shared_ptr<AsynchronousInser
|
||||
{
|
||||
AsynchronousInsertQueue::validateSettings(settings, getLogger("Context"));
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
SharedLockGuard lock(shared->mutex);
|
||||
|
||||
if (std::chrono::milliseconds(settings.async_insert_poll_timeout_ms) == std::chrono::milliseconds::zero())
|
||||
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting async_insert_poll_timeout_ms can't be zero");
|
||||
|
Loading…
Reference in New Issue
Block a user