Fixed tests

This commit is contained in:
Maksim Kita 2021-10-07 20:13:56 +03:00
parent afdc2fe50d
commit aae409b321
2 changed files with 25 additions and 37 deletions

View File

@ -5,6 +5,7 @@
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <optional>
#include <base/MoveOrCopyIfThrow.h>
@ -28,16 +29,16 @@ private:
size_t max_fill = 0;
template <typename ... Args>
bool emplaceImpl(bool wait_on_timeout, UInt64 timeout_milliseconds = 0, Args &&...args)
bool emplaceImpl(std::optional<UInt64> timeout_milliseconds, Args &&...args)
{
{
std::unique_lock<std::mutex> queue_lock(queue_mutex);
auto predicate = [&]() { return is_finished || queue.size() < max_fill; };
if (wait_on_timeout)
if (timeout_milliseconds.has_value())
{
bool wait_result = push_condition.wait_for(queue_lock, std::chrono::milliseconds(timeout_milliseconds), predicate);
bool wait_result = push_condition.wait_for(queue_lock, std::chrono::milliseconds(timeout_milliseconds.value()), predicate);
if (!wait_result)
return false;
@ -57,23 +58,23 @@ private:
return true;
}
bool popImpl(T & x, bool wait_on_timeout, UInt64 timeout_milliseconds = 0)
bool popImpl(T & x, std::optional<UInt64> timeout_milliseconds)
{
{
std::unique_lock<std::mutex> queue_lock(queue_mutex);
auto predicate = [&]() { return is_finished || !queue.empty(); };
if (wait_on_timeout)
if (timeout_milliseconds.has_value())
{
bool wait_result = pop_condition.wait_for(queue_lock, std::chrono::milliseconds(timeout_milliseconds), predicate);
bool wait_result = pop_condition.wait_for(queue_lock, std::chrono::milliseconds(timeout_milliseconds.value()), predicate);
if (!wait_result)
return false;
}
else
{
pop_condition.wait(queue_lock, [&](){ return is_finished || !queue.empty(); });
pop_condition.wait(queue_lock, predicate);
}
if (is_finished && queue.empty())
@ -81,8 +82,6 @@ private:
detail::moveOrCopyIfThrow(std::move(queue.front()), x);
queue.pop();
return true;
}
push_condition.notify_one();
@ -105,33 +104,33 @@ public:
template <typename... Args>
bool emplace(Args &&... args)
{
emplaceImpl(false /*wait on timeout*/, 0 /* timeout in milliseconds */, std::forward<Args...>(args...));
emplaceImpl(std::nullopt /* timeout in milliseconds */, std::forward<Args...>(args...));
return true;
}
/// Returns false if queue is finished
/// Returns false if queue is finished and empty
[[nodiscard]] bool pop(T & x)
{
return popImpl(x, false /* wait on timeout*/);
return popImpl(x, std::nullopt /*timeout in milliseconds*/);
}
/// Returns false if queue is finished or object was not pushed during timeout
bool tryPush(const T & x, UInt64 milliseconds = 0)
{
return emplaceImpl(true /*wait on timeout*/, milliseconds, x);
return emplaceImpl(milliseconds, x);
}
/// Returns false if queue is finished or object was not emplaced during timeout
template <typename... Args>
bool tryEmplace(UInt64 milliseconds, Args &&... args)
{
return emplaceImpl(true /*wait on timeout*/, milliseconds, std::forward<Args...>(args...));
return emplaceImpl(milliseconds, std::forward<Args...>(args...));
}
/// Returns false if queue is finished or object was not popped during timeout
/// Returns false if queue is (finished and empty) or (object was not popped during timeout)
[[nodiscard]] bool tryPop(T & x, UInt64 milliseconds = 0)
{
return popImpl(x, true /*wait on timeout*/, milliseconds);
return popImpl(x, milliseconds);
}
/// Returns size of queue
@ -163,9 +162,6 @@ public:
if (is_finished)
return true;
std::queue<T> empty_queue;
queue.swap(empty_queue);
was_finished_before = is_finished;
is_finished = true;
}
@ -186,13 +182,15 @@ public:
/// Clear queue
void clear()
{
std::lock_guard<std::mutex> lock(queue_mutex);
{
std::lock_guard<std::mutex> lock(queue_mutex);
if (is_finished)
return;
if (is_finished)
return;
std::queue<T> empty_queue;
queue.swap(empty_queue);
std::queue<T> empty_queue;
queue.swap(empty_queue);
}
push_condition.notify_all();
}
@ -203,9 +201,6 @@ public:
{
std::lock_guard<std::mutex> lock(queue_mutex);
if (is_finished)
return;
std::queue<T> empty_queue;
queue.swap(empty_queue);
is_finished = true;

View File

@ -317,16 +317,9 @@ void KeeperDispatcher::shutdown()
/// Set session expired for all pending requests
while (requests_queue && requests_queue->tryPop(request_for_session))
{
if (request_for_session.request)
{
auto response = request_for_session.request->makeResponse();
response->error = Coordination::Error::ZSESSIONEXPIRED;
setResponse(request_for_session.session_id, response);
}
else
{
break;
}
auto response = request_for_session.request->makeResponse();
response->error = Coordination::Error::ZSESSIONEXPIRED;
setResponse(request_for_session.session_id, response);
}
/// Clear all registered sessions