Fixed tests

This commit is contained in:
Maksim Kita 2021-10-07 11:21:01 +03:00
parent c9b6c2661b
commit afdc2fe50d
3 changed files with 25 additions and 4 deletions

View File

@ -76,11 +76,13 @@ private:
pop_condition.wait(queue_lock, [&](){ return is_finished || !queue.empty(); }); pop_condition.wait(queue_lock, [&](){ return is_finished || !queue.empty(); });
} }
if (is_finished) if (is_finished && queue.empty())
return false; return false;
detail::moveOrCopyIfThrow(std::move(queue.front()), x); detail::moveOrCopyIfThrow(std::move(queue.front()), x);
queue.pop(); queue.pop();
return true;
} }
push_condition.notify_one(); push_condition.notify_one();
@ -147,7 +149,8 @@ public:
} }
/** Clear and finish queue /** Clear and finish queue
* After that push or pop operations will return false * After that push operation will return false
* pop operations will return values until queue become empty
* Returns true if queue was already finished * Returns true if queue was already finished
*/ */
bool finish() bool finish()
@ -193,4 +196,22 @@ public:
push_condition.notify_all(); push_condition.notify_all();
} }
/// Clear and finish queue
void clearAndFinish()
{
{
std::lock_guard<std::mutex> lock(queue_mutex);
if (is_finished)
return;
std::queue<T> empty_queue;
queue.swap(empty_queue);
is_finished = true;
}
pop_condition.notify_all();
push_condition.notify_all();
}
}; };

View File

@ -42,7 +42,7 @@ public:
std::chrono::milliseconds(timeout_ms), [this] { return is_finished || !queue.empty(); })) std::chrono::milliseconds(timeout_ms), [this] { return is_finished || !queue.empty(); }))
return false; return false;
if (is_finished) if (is_finished && queue.empty())
return false; return false;
::detail::moveOrCopyIfThrow(std::move(queue.front()), response); ::detail::moveOrCopyIfThrow(std::move(queue.front()), response);

View File

@ -113,7 +113,7 @@ void CacheDictionaryUpdateQueue<dictionary_key_type>::stopAndWait()
if (update_queue.isFinished()) if (update_queue.isFinished())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "CacheDictionaryUpdateQueue finished"); throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "CacheDictionaryUpdateQueue finished");
update_queue.finish(); update_queue.clearAndFinish();
update_pool.wait(); update_pool.wait();
} }