diff --git a/src/Common/OvercommitTracker.cpp b/src/Common/OvercommitTracker.cpp index 69f0f2a616c..45ca77e6ef2 100644 --- a/src/Common/OvercommitTracker.cpp +++ b/src/Common/OvercommitTracker.cpp @@ -73,11 +73,8 @@ bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount) // If threads where not released since last call of this method, // we can release them now. - if (allow_release && required_memory <= freed_momory) - { - assert(still_need != 0); + if (allow_release && required_memory <= freed_momory && still_need != 0) releaseThreads(); - } // All required amount of memory is free now and selected query to stop doesn't know about it. // As we don't need to free memory, we can continue execution of the selected query. diff --git a/src/Common/tests/gtest_overcommit_tracker.cpp b/src/Common/tests/gtest_overcommit_tracker.cpp index ecf4de54e6a..07fadc6a337 100644 --- a/src/Common/tests/gtest_overcommit_tracker.cpp +++ b/src/Common/tests/gtest_overcommit_tracker.cpp @@ -51,15 +51,16 @@ void free_not_continue_test(T & overcommit_tracker) for (size_t i = 0; i < THREADS; ++i) { - threads.push_back(std::thread([&, i](){ - if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) - ++need_to_stop; - })); + threads.push_back(std::thread( + [&, i]() + { + if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) + ++need_to_stop; + } + )); } - std::thread([&](){ - overcommit_tracker.tryContinueQueryExecutionAfterFree(50); - }).join(); + std::thread([&]() { overcommit_tracker.tryContinueQueryExecutionAfterFree(50); }).join(); for (auto & thread : threads) { @@ -100,15 +101,16 @@ void free_continue_test(T & overcommit_tracker) for (size_t i = 0; i < THREADS; ++i) { - threads.push_back(std::thread([&, i](){ - if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) - ++need_to_stop; - })); + threads.push_back(std::thread( + [&, i]() + { + if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) + ++need_to_stop; + } + )); } - std::thread([&](){ - overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); - }).join(); + std::thread([&]() { overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); }).join(); for (auto & thread : threads) { @@ -149,18 +151,24 @@ void free_continue_and_alloc_test(T & overcommit_tracker) for (size_t i = 0; i < THREADS; ++i) { - threads.push_back(std::thread([&, i](){ - if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) - ++need_to_stop; - })); + threads.push_back(std::thread( + [&, i]() + { + if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) + ++need_to_stop; + } + )); } bool stopped_next = false; - std::thread([&](){ - MemoryTracker failed; - overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); - stopped_next = overcommit_tracker.needToStopQuery(&failed, 100); - }).join(); + std::thread( + [&]() + { + MemoryTracker failed; + overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); + stopped_next = overcommit_tracker.needToStopQuery(&failed, 100); + } + ).join(); for (auto & thread : threads) { @@ -202,18 +210,24 @@ void free_continue_and_alloc_2_test(T & overcommit_tracker) for (size_t i = 0; i < THREADS; ++i) { - threads.push_back(std::thread([&, i](){ - if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) - ++need_to_stop; - })); + threads.push_back(std::thread( + [&, i]() + { + if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) + ++need_to_stop; + } + )); } bool stopped_next = false; - threads.push_back(std::thread([&](){ - MemoryTracker failed; - overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); - stopped_next = overcommit_tracker.needToStopQuery(&failed, 100); - })); + threads.push_back(std::thread( + [&]() + { + MemoryTracker failed; + overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); + stopped_next = overcommit_tracker.needToStopQuery(&failed, 100); + } + )); overcommit_tracker.tryContinueQueryExecutionAfterFree(90); @@ -257,18 +271,24 @@ void free_continue_and_alloc_3_test(T & overcommit_tracker) for (size_t i = 0; i < THREADS; ++i) { - threads.push_back(std::thread([&, i](){ - if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) - ++need_to_stop; - })); + threads.push_back(std::thread( + [&, i]() + { + if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) + ++need_to_stop; + } + )); } bool stopped_next = false; - threads.push_back(std::thread([&](){ - MemoryTracker failed; - overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); - stopped_next = overcommit_tracker.needToStopQuery(&failed, 100); - })); + threads.push_back(std::thread( + [&]() + { + MemoryTracker failed; + overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); + stopped_next = overcommit_tracker.needToStopQuery(&failed, 100); + } + )); overcommit_tracker.tryContinueQueryExecutionAfterFree(100); @@ -312,15 +332,18 @@ void free_continue_2_test(T & overcommit_tracker) for (size_t i = 0; i < THREADS; ++i) { - threads.push_back(std::thread([&, i](){ - if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) - ++need_to_stop; - })); + threads.push_back(std::thread( + [&, i]() + { + if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) + ++need_to_stop; + } + )); } - std::thread([&](){ - overcommit_tracker.tryContinueQueryExecutionAfterFree(300); - }).join(); + std::thread( + [&]() { overcommit_tracker.tryContinueQueryExecutionAfterFree(300); } + ).join(); for (auto & thread : threads) { @@ -356,10 +379,13 @@ void query_stop_not_continue_test(T & overcommit_tracker) overcommit_tracker.setCandidate(&picked); MemoryTracker another; - auto thread = std::thread([&](){ + auto thread = std::thread( + [&]() + { if (overcommit_tracker.needToStopQuery(&another, 100)) ++need_to_stop; - }); + } + ); overcommit_tracker.onQueryStop(&picked); thread.join();