This commit is contained in:
Dmitry Novik 2022-04-12 15:58:34 +00:00
parent 63de3c9746
commit d9d2bf310d
2 changed files with 77 additions and 54 deletions

View File

@ -73,11 +73,8 @@ bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount)
// If threads where not released since last call of this method, // If threads where not released since last call of this method,
// we can release them now. // we can release them now.
if (allow_release && required_memory <= freed_momory) if (allow_release && required_memory <= freed_momory && still_need != 0)
{
assert(still_need != 0);
releaseThreads(); releaseThreads();
}
// All required amount of memory is free now and selected query to stop doesn't know about it. // All required amount of memory is free now and selected query to stop doesn't know about it.
// As we don't need to free memory, we can continue execution of the selected query. // As we don't need to free memory, we can continue execution of the selected query.

View File

@ -51,15 +51,16 @@ void free_not_continue_test(T & overcommit_tracker)
for (size_t i = 0; i < THREADS; ++i) for (size_t i = 0; i < THREADS; ++i)
{ {
threads.push_back(std::thread([&, i](){ threads.push_back(std::thread(
if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) [&, i]()
++need_to_stop; {
})); if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
++need_to_stop;
}
));
} }
std::thread([&](){ std::thread([&]() { overcommit_tracker.tryContinueQueryExecutionAfterFree(50); }).join();
overcommit_tracker.tryContinueQueryExecutionAfterFree(50);
}).join();
for (auto & thread : threads) for (auto & thread : threads)
{ {
@ -100,15 +101,16 @@ void free_continue_test(T & overcommit_tracker)
for (size_t i = 0; i < THREADS; ++i) for (size_t i = 0; i < THREADS; ++i)
{ {
threads.push_back(std::thread([&, i](){ threads.push_back(std::thread(
if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) [&, i]()
++need_to_stop; {
})); if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
++need_to_stop;
}
));
} }
std::thread([&](){ std::thread([&]() { overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); }).join();
overcommit_tracker.tryContinueQueryExecutionAfterFree(5000);
}).join();
for (auto & thread : threads) for (auto & thread : threads)
{ {
@ -149,18 +151,24 @@ void free_continue_and_alloc_test(T & overcommit_tracker)
for (size_t i = 0; i < THREADS; ++i) for (size_t i = 0; i < THREADS; ++i)
{ {
threads.push_back(std::thread([&, i](){ threads.push_back(std::thread(
if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) [&, i]()
++need_to_stop; {
})); if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
++need_to_stop;
}
));
} }
bool stopped_next = false; bool stopped_next = false;
std::thread([&](){ std::thread(
MemoryTracker failed; [&]()
overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); {
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100); MemoryTracker failed;
}).join(); overcommit_tracker.tryContinueQueryExecutionAfterFree(5000);
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100);
}
).join();
for (auto & thread : threads) for (auto & thread : threads)
{ {
@ -202,18 +210,24 @@ void free_continue_and_alloc_2_test(T & overcommit_tracker)
for (size_t i = 0; i < THREADS; ++i) for (size_t i = 0; i < THREADS; ++i)
{ {
threads.push_back(std::thread([&, i](){ threads.push_back(std::thread(
if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) [&, i]()
++need_to_stop; {
})); if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
++need_to_stop;
}
));
} }
bool stopped_next = false; bool stopped_next = false;
threads.push_back(std::thread([&](){ threads.push_back(std::thread(
MemoryTracker failed; [&]()
overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); {
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100); MemoryTracker failed;
})); overcommit_tracker.tryContinueQueryExecutionAfterFree(5000);
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100);
}
));
overcommit_tracker.tryContinueQueryExecutionAfterFree(90); overcommit_tracker.tryContinueQueryExecutionAfterFree(90);
@ -257,18 +271,24 @@ void free_continue_and_alloc_3_test(T & overcommit_tracker)
for (size_t i = 0; i < THREADS; ++i) for (size_t i = 0; i < THREADS; ++i)
{ {
threads.push_back(std::thread([&, i](){ threads.push_back(std::thread(
if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) [&, i]()
++need_to_stop; {
})); if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
++need_to_stop;
}
));
} }
bool stopped_next = false; bool stopped_next = false;
threads.push_back(std::thread([&](){ threads.push_back(std::thread(
MemoryTracker failed; [&]()
overcommit_tracker.tryContinueQueryExecutionAfterFree(5000); {
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100); MemoryTracker failed;
})); overcommit_tracker.tryContinueQueryExecutionAfterFree(5000);
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100);
}
));
overcommit_tracker.tryContinueQueryExecutionAfterFree(100); overcommit_tracker.tryContinueQueryExecutionAfterFree(100);
@ -312,15 +332,18 @@ void free_continue_2_test(T & overcommit_tracker)
for (size_t i = 0; i < THREADS; ++i) for (size_t i = 0; i < THREADS; ++i)
{ {
threads.push_back(std::thread([&, i](){ threads.push_back(std::thread(
if (overcommit_tracker.needToStopQuery(&trackers[i], 100)) [&, i]()
++need_to_stop; {
})); if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
++need_to_stop;
}
));
} }
std::thread([&](){ std::thread(
overcommit_tracker.tryContinueQueryExecutionAfterFree(300); [&]() { overcommit_tracker.tryContinueQueryExecutionAfterFree(300); }
}).join(); ).join();
for (auto & thread : threads) for (auto & thread : threads)
{ {
@ -356,10 +379,13 @@ void query_stop_not_continue_test(T & overcommit_tracker)
overcommit_tracker.setCandidate(&picked); overcommit_tracker.setCandidate(&picked);
MemoryTracker another; MemoryTracker another;
auto thread = std::thread([&](){ auto thread = std::thread(
[&]()
{
if (overcommit_tracker.needToStopQuery(&another, 100)) if (overcommit_tracker.needToStopQuery(&another, 100))
++need_to_stop; ++need_to_stop;
}); }
);
overcommit_tracker.onQueryStop(&picked); overcommit_tracker.onQueryStop(&picked);
thread.join(); thread.join();