ClickHouse/src/Common/ConcurrentBoundedQueue.h

197 lines
5.1 KiB
C++
Raw Normal View History

2013-05-03 02:25:50 +00:00
#pragma once
#include <queue>
#include <type_traits>
2021-08-26 15:00:27 +00:00
#include <atomic>
2021-10-06 11:02:40 +00:00
#include <condition_variable>
#include <mutex>
2013-05-03 02:25:50 +00:00
2021-10-02 07:13:14 +00:00
#include <base/MoveOrCopyIfThrow.h>
2021-08-26 15:00:27 +00:00
2013-05-03 02:25:50 +00:00
/** A very simple thread-safe queue of limited size.
2021-10-06 11:02:40 +00:00
* If you try to pop an item from an empty queue, the thread is blocked until the queue becomes nonempty or queue is finished.
* If you try to push an element into an overflowed queue, the thread is blocked until space appears in the queue or queue is finished.
2013-05-03 02:25:50 +00:00
*/
template <typename T>
class ConcurrentBoundedQueue
{
private:
std::queue<T> queue;
2021-08-26 15:00:27 +00:00
2021-10-06 11:02:40 +00:00
mutable std::mutex queue_mutex;
std::condition_variable push_condition;
std::condition_variable pop_condition;
bool is_finished = false;
size_t max_fill = 0;
2021-08-26 15:00:27 +00:00
2021-10-06 11:02:40 +00:00
template <typename ... Args>
bool emplaceImpl(bool wait_on_timeout, UInt64 timeout_milliseconds = 0, Args &&...args)
{
2021-08-26 15:00:27 +00:00
{
2021-10-06 11:02:40 +00:00
std::unique_lock<std::mutex> queue_lock(queue_mutex);
auto predicate = [&]() { return is_finished || queue.size() < max_fill; };
if (wait_on_timeout)
{
bool wait_result = push_condition.wait_for(queue_lock, std::chrono::milliseconds(timeout_milliseconds), predicate);
if (!wait_result)
return false;
}
2021-08-26 15:00:27 +00:00
else
2021-10-06 11:02:40 +00:00
{
push_condition.wait(queue_lock, [&](){ return is_finished || queue.size() < max_fill; });
}
2021-08-26 15:00:27 +00:00
2021-10-06 11:02:40 +00:00
if (is_finished)
return false;
2021-08-26 15:00:27 +00:00
2021-10-06 11:02:40 +00:00
queue.emplace(std::forward<Args>(args)...);
}
pop_condition.notify_one();
return true;
2021-08-26 15:00:27 +00:00
}
2021-10-06 11:02:40 +00:00
bool popImpl(T & x, bool wait_on_timeout, UInt64 timeout_milliseconds = 0)
2021-08-26 15:00:27 +00:00
{
{
2021-10-06 11:02:40 +00:00
std::unique_lock<std::mutex> queue_lock(queue_mutex);
auto predicate = [&]() { return is_finished || !queue.empty(); };
if (wait_on_timeout)
{
bool wait_result = pop_condition.wait_for(queue_lock, std::chrono::milliseconds(timeout_milliseconds), predicate);
if (!wait_result)
return false;
}
else
{
pop_condition.wait(queue_lock, [&](){ return is_finished || !queue.empty(); });
}
if (is_finished)
return false;
2021-08-26 15:00:27 +00:00
detail::moveOrCopyIfThrow(std::move(queue.front()), x);
queue.pop();
}
2021-10-06 11:02:40 +00:00
push_condition.notify_one();
return true;
2021-08-26 15:00:27 +00:00
}
2013-05-03 02:25:50 +00:00
public:
2021-10-06 11:02:40 +00:00
explicit ConcurrentBoundedQueue(size_t max_fill_)
: max_fill(max_fill_)
2021-03-03 19:34:25 +00:00
{}
2021-10-06 11:02:40 +00:00
/// Returns false if queue is finished
bool push(const T & x)
{
2021-10-06 11:02:40 +00:00
return emplace(x);
}
2021-10-06 11:02:40 +00:00
/// Returns false if queue is finished
2017-06-01 13:41:58 +00:00
template <typename... Args>
2021-10-06 11:02:40 +00:00
bool emplace(Args &&... args)
{
2021-10-06 11:02:40 +00:00
emplaceImpl(false /*wait on timeout*/, 0 /* timeout in milliseconds */, std::forward<Args...>(args...));
return true;
}
2021-10-06 11:02:40 +00:00
/// Returns false if queue is finished
[[nodiscard]] bool pop(T & x)
{
2021-10-06 11:02:40 +00:00
return popImpl(x, false /* wait on timeout*/);
}
2021-10-06 11:02:40 +00:00
/// Returns false if queue is finished or object was not pushed during timeout
2017-06-01 13:41:58 +00:00
bool tryPush(const T & x, UInt64 milliseconds = 0)
{
2021-10-06 11:02:40 +00:00
return emplaceImpl(true /*wait on timeout*/, milliseconds, x);
}
2021-10-06 11:02:40 +00:00
/// Returns false if queue is finished or object was not emplaced during timeout
2017-06-01 13:41:58 +00:00
template <typename... Args>
bool tryEmplace(UInt64 milliseconds, Args &&... args)
{
2021-10-06 11:02:40 +00:00
return emplaceImpl(true /*wait on timeout*/, milliseconds, std::forward<Args...>(args...));
}
2021-10-06 11:02:40 +00:00
/// Returns false if queue is finished or object was not popped during timeout
[[nodiscard]] bool tryPop(T & x, UInt64 milliseconds = 0)
{
2021-10-06 11:02:40 +00:00
return popImpl(x, true /*wait on timeout*/, milliseconds);
}
2021-10-06 11:02:40 +00:00
/// Returns size of queue
2021-08-26 15:00:27 +00:00
size_t size() const
{
2021-10-06 11:02:40 +00:00
std::lock_guard<std::mutex> lock(queue_mutex);
return queue.size();
}
2021-10-06 11:02:40 +00:00
/// Returns if queue is empty
bool empty() const
2020-03-09 02:55:28 +00:00
{
2021-10-06 11:02:40 +00:00
std::lock_guard<std::mutex> lock(queue_mutex);
2020-03-09 02:55:28 +00:00
return queue.empty();
}
2021-10-06 11:02:40 +00:00
/** Clear and finish queue
* After that push or pop operations will return false
* Returns true if queue was already finished
*/
bool finish()
2021-08-26 15:00:27 +00:00
{
2021-10-06 11:02:40 +00:00
bool was_finished_before = false;
{
std::lock_guard<std::mutex> lock(queue_mutex);
if (is_finished)
return true;
std::queue<T> empty_queue;
queue.swap(empty_queue);
was_finished_before = is_finished;
is_finished = true;
}
pop_condition.notify_all();
push_condition.notify_all();
return was_finished_before;
2021-08-26 15:00:27 +00:00
}
2021-10-06 11:02:40 +00:00
/// Returns if queue is finished
bool isFinished() const
2021-08-26 15:00:27 +00:00
{
2021-10-06 11:02:40 +00:00
std::lock_guard<std::mutex> lock(queue_mutex);
return is_finished;
2021-08-26 15:00:27 +00:00
}
2021-10-06 11:02:40 +00:00
/// Clear queue
void clear()
{
2021-10-06 11:02:40 +00:00
std::lock_guard<std::mutex> lock(queue_mutex);
if (is_finished)
return;
std::queue<T> empty_queue;
queue.swap(empty_queue);
push_condition.notify_all();
}
2013-05-03 02:25:50 +00:00
};