mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Fix lock order
This commit is contained in:
parent
31a5e85aa4
commit
bb6dad7d0e
@ -179,6 +179,11 @@ public:
|
||||
overcommit_tracker = tracker;
|
||||
}
|
||||
|
||||
OvercommitTracker * getOvercommitTracker() noexcept
|
||||
{
|
||||
return overcommit_tracker;
|
||||
}
|
||||
|
||||
/// Reset the accumulated data
|
||||
void resetCounters();
|
||||
|
||||
|
@ -91,6 +91,8 @@ void GlobalOvercommitTracker::pickQueryToExcludeImpl()
|
||||
{
|
||||
MemoryTracker * query_tracker = nullptr;
|
||||
OvercommitRatio current_ratio{0, 0};
|
||||
// At this moment query list must be read only.
|
||||
// BlockQueryIfMemoryLimit is used in ProcessList to guarantee this.
|
||||
process_list->processEachQueryStatus([&](DB::QueryStatus const & query)
|
||||
{
|
||||
if (query.isKilled())
|
||||
|
@ -134,15 +134,15 @@ private:
|
||||
// list immutable when UserOvercommitTracker reads it.
|
||||
struct BlockQueryIfMemoryLimit
|
||||
{
|
||||
BlockQueryIfMemoryLimit(OvercommitTracker const & overcommit_tracker)
|
||||
: mutex(overcommit_tracker.overcommit_m)
|
||||
BlockQueryIfMemoryLimit(OvercommitTracker * overcommit_tracker)
|
||||
: mutex(overcommit_tracker->overcommit_m)
|
||||
, lk(mutex)
|
||||
{
|
||||
if (overcommit_tracker.cancelation_state == OvercommitTracker::QueryCancelationState::RUNNING)
|
||||
if (overcommit_tracker->cancelation_state == OvercommitTracker::QueryCancelationState::RUNNING)
|
||||
{
|
||||
overcommit_tracker.cv.wait_for(lk, overcommit_tracker.max_wait_time, [&overcommit_tracker]()
|
||||
overcommit_tracker->cv.wait_for(lk, overcommit_tracker->max_wait_time, [&overcommit_tracker]()
|
||||
{
|
||||
return overcommit_tracker.cancelation_state == OvercommitTracker::QueryCancelationState::NONE;
|
||||
return overcommit_tracker->cancelation_state == OvercommitTracker::QueryCancelationState::NONE;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -9,6 +9,9 @@
|
||||
#include <Parsers/IAST.h>
|
||||
#include <Parsers/queryNormalization.h>
|
||||
#include <Processors/Executors/PipelineExecutor.h>
|
||||
#include "Common/MemoryTracker.h"
|
||||
#include "Common/OvercommitTracker.h"
|
||||
#include "Common/tests/gtest_global_context.h"
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
@ -226,8 +229,12 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
|
||||
/// since allocation and deallocation could happen in different threads
|
||||
}
|
||||
|
||||
auto process_it = processes.emplace(processes.end(),
|
||||
query_context, query_, client_info, priorities.insert(settings.priority), std::move(thread_group), query_kind);
|
||||
Container::iterator process_it;
|
||||
{
|
||||
BlockQueryIfMemoryLimit block_query{total_memory_tracker.getOvercommitTracker()};
|
||||
process_it = processes.emplace(processes.end(),
|
||||
query_context, query_, client_info, priorities.insert(settings.priority), std::move(thread_group), query_kind);
|
||||
}
|
||||
|
||||
increaseQueryKindAmount(query_kind);
|
||||
|
||||
@ -236,7 +243,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
|
||||
process_it->setUserProcessList(&user_process_list);
|
||||
|
||||
{
|
||||
BlockQueryIfMemoryLimit block_query{user_process_list.user_overcommit_tracker};
|
||||
BlockQueryIfMemoryLimit block_query{&user_process_list.user_overcommit_tracker};
|
||||
user_process_list.queries.emplace(client_info.current_query_id, &res->get());
|
||||
}
|
||||
|
||||
@ -289,14 +296,17 @@ ProcessListEntry::~ProcessListEntry()
|
||||
{
|
||||
if (running_query->second == process_list_element_ptr)
|
||||
{
|
||||
BlockQueryIfMemoryLimit block_query{user_process_list.user_overcommit_tracker};
|
||||
BlockQueryIfMemoryLimit block_query{&user_process_list.user_overcommit_tracker};
|
||||
user_process_list.queries.erase(running_query->first);
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
|
||||
/// This removes the memory_tracker of one request.
|
||||
parent.processes.erase(it);
|
||||
{
|
||||
BlockQueryIfMemoryLimit block_query{total_memory_tracker.getOvercommitTracker()};
|
||||
/// This removes the memory_tracker of one request.
|
||||
parent.processes.erase(it);
|
||||
}
|
||||
|
||||
if (!found)
|
||||
{
|
||||
|
@ -349,10 +349,11 @@ public:
|
||||
max_size = max_size_;
|
||||
}
|
||||
|
||||
// Before calling this method you should be sure
|
||||
// that lock is acquired.
|
||||
template <typename F>
|
||||
void processEachQueryStatus(F && func) const
|
||||
{
|
||||
std::lock_guard lk(mutex);
|
||||
for (auto && query : processes)
|
||||
func(query);
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
<clickhouse>
|
||||
<max_server_memory_usage>5000000</max_server_memory_usage>
|
||||
<global_memory_usage_overcommit_max_wait_microseconds>200</global_memory_usage_overcommit_max_wait_microseconds>
|
||||
<max_server_memory_usage>50000000</max_server_memory_usage>
|
||||
<global_memory_usage_overcommit_max_wait_microseconds>500</global_memory_usage_overcommit_max_wait_microseconds>
|
||||
</clickhouse>
|
Loading…
Reference in New Issue
Block a user