ClickHouse/src/Common/ConcurrentBoundedQueue.h

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

230 lines
5.8 KiB
C++
Raw Normal View History

2013-05-03 02:25:50 +00:00
#pragma once
#include <queue>
#include <type_traits>
2021-08-26 15:00:27 +00:00
#include <atomic>
2021-10-06 11:02:40 +00:00
#include <condition_variable>
#include <mutex>
2021-10-07 17:13:56 +00:00
#include <optional>
2013-05-03 02:25:50 +00:00
2021-10-02 07:13:14 +00:00
#include <base/MoveOrCopyIfThrow.h>
2021-08-26 15:00:27 +00:00
2013-05-03 02:25:50 +00:00
/** A very simple thread-safe queue of limited size.
2021-10-06 11:02:40 +00:00
* If you try to pop an item from an empty queue, the thread is blocked until the queue becomes nonempty or queue is finished.
* If you try to push an element into an overflowed queue, the thread is blocked until space appears in the queue or queue is finished.
2013-05-03 02:25:50 +00:00
*/
template <typename T>
class ConcurrentBoundedQueue
{
private:
std::queue<T> queue;
2021-08-26 15:00:27 +00:00
2021-10-06 11:02:40 +00:00
mutable std::mutex queue_mutex;
std::condition_variable push_condition;
std::condition_variable pop_condition;
bool is_finished = false;
size_t max_fill = 0;
2021-08-26 15:00:27 +00:00
2021-10-06 11:02:40 +00:00
template <typename ... Args>
2021-10-07 17:13:56 +00:00
bool emplaceImpl(std::optional<UInt64> timeout_milliseconds, Args &&...args)
2021-10-06 11:02:40 +00:00
{
2021-08-26 15:00:27 +00:00
{
2021-10-06 11:02:40 +00:00
std::unique_lock<std::mutex> queue_lock(queue_mutex);
auto predicate = [&]() { return is_finished || queue.size() < max_fill; };
2021-10-07 17:13:56 +00:00
if (timeout_milliseconds.has_value())
2021-10-06 11:02:40 +00:00
{
2021-10-07 17:13:56 +00:00
bool wait_result = push_condition.wait_for(queue_lock, std::chrono::milliseconds(timeout_milliseconds.value()), predicate);
2021-10-06 11:02:40 +00:00
if (!wait_result)
return false;
}
2021-08-26 15:00:27 +00:00
else
2021-10-06 11:02:40 +00:00
{
2021-10-08 17:18:33 +00:00
push_condition.wait(queue_lock, predicate);
2021-10-06 11:02:40 +00:00
}
2021-08-26 15:00:27 +00:00
2021-10-06 11:02:40 +00:00
if (is_finished)
return false;
2021-08-26 15:00:27 +00:00
2021-10-06 11:02:40 +00:00
queue.emplace(std::forward<Args>(args)...);
}
pop_condition.notify_one();
return true;
2021-08-26 15:00:27 +00:00
}
2021-10-07 17:13:56 +00:00
bool popImpl(T & x, std::optional<UInt64> timeout_milliseconds)
2021-08-26 15:00:27 +00:00
{
{
2021-10-06 11:02:40 +00:00
std::unique_lock<std::mutex> queue_lock(queue_mutex);
auto predicate = [&]() { return is_finished || !queue.empty(); };
2021-10-07 17:13:56 +00:00
if (timeout_milliseconds.has_value())
2021-10-06 11:02:40 +00:00
{
2021-10-07 17:13:56 +00:00
bool wait_result = pop_condition.wait_for(queue_lock, std::chrono::milliseconds(timeout_milliseconds.value()), predicate);
2021-10-06 11:02:40 +00:00
if (!wait_result)
return false;
}
else
{
2021-10-07 17:13:56 +00:00
pop_condition.wait(queue_lock, predicate);
2021-10-06 11:02:40 +00:00
}
2021-10-07 08:21:01 +00:00
if (is_finished && queue.empty())
2021-10-06 11:02:40 +00:00
return false;
2021-08-26 15:00:27 +00:00
detail::moveOrCopyIfThrow(std::move(queue.front()), x);
queue.pop();
}
2021-10-06 11:02:40 +00:00
push_condition.notify_one();
return true;
2021-08-26 15:00:27 +00:00
}
2013-05-03 02:25:50 +00:00
public:
2021-10-06 11:02:40 +00:00
explicit ConcurrentBoundedQueue(size_t max_fill_)
: max_fill(max_fill_)
2021-03-03 19:34:25 +00:00
{}
2021-10-06 11:02:40 +00:00
/// Returns false if queue is finished
2021-10-07 22:06:33 +00:00
[[nodiscard]] bool push(const T & x)
2013-05-03 02:25:50 +00:00
{
2021-10-06 11:02:40 +00:00
return emplace(x);
2013-05-03 02:25:50 +00:00
}
[[nodiscard]] bool push(T && x)
{
return emplace(std::move(x));
}
2021-10-06 11:02:40 +00:00
/// Returns false if queue is finished
2017-06-01 13:41:58 +00:00
template <typename... Args>
2021-10-07 22:06:33 +00:00
[[nodiscard]] bool emplace(Args &&... args)
2016-04-22 19:57:40 +00:00
{
2021-10-07 17:13:56 +00:00
emplaceImpl(std::nullopt /* timeout in milliseconds */, std::forward<Args...>(args...));
2021-10-06 11:02:40 +00:00
return true;
2016-04-22 19:57:40 +00:00
}
2021-10-07 17:13:56 +00:00
/// Returns false if queue is finished and empty
2021-10-06 11:02:40 +00:00
[[nodiscard]] bool pop(T & x)
2013-05-03 02:25:50 +00:00
{
2021-10-07 17:13:56 +00:00
return popImpl(x, std::nullopt /*timeout in milliseconds*/);
2013-05-03 02:25:50 +00:00
}
2021-10-06 11:02:40 +00:00
/// Returns false if queue is finished or object was not pushed during timeout
2021-10-07 22:06:33 +00:00
[[nodiscard]] bool tryPush(const T & x, UInt64 milliseconds = 0)
2013-05-03 02:25:50 +00:00
{
2021-10-07 17:13:56 +00:00
return emplaceImpl(milliseconds, x);
2013-05-03 02:25:50 +00:00
}
[[nodiscard]] bool tryPush(T && x, UInt64 milliseconds = 0)
{
return emplaceImpl(milliseconds, std::move(x));
}
2021-10-06 11:02:40 +00:00
/// Returns false if queue is finished or object was not emplaced during timeout
2017-06-01 13:41:58 +00:00
template <typename... Args>
2021-10-07 22:06:33 +00:00
[[nodiscard]] bool tryEmplace(UInt64 milliseconds, Args &&... args)
{
2021-10-07 17:13:56 +00:00
return emplaceImpl(milliseconds, std::forward<Args...>(args...));
}
2021-10-07 17:13:56 +00:00
/// Returns false if queue is (finished and empty) or (object was not popped during timeout)
2021-10-06 11:02:40 +00:00
[[nodiscard]] bool tryPop(T & x, UInt64 milliseconds = 0)
2013-05-03 02:25:50 +00:00
{
2021-10-07 17:13:56 +00:00
return popImpl(x, milliseconds);
2013-05-03 02:25:50 +00:00
}
2021-10-06 11:02:40 +00:00
/// Returns size of queue
2021-08-26 15:00:27 +00:00
size_t size() const
{
2021-10-06 11:02:40 +00:00
std::lock_guard<std::mutex> lock(queue_mutex);
return queue.size();
}
2021-10-06 11:02:40 +00:00
/// Returns if queue is empty
bool empty() const
2020-03-09 02:55:28 +00:00
{
2021-10-06 11:02:40 +00:00
std::lock_guard<std::mutex> lock(queue_mutex);
2020-03-09 02:55:28 +00:00
return queue.empty();
}
2021-10-06 11:02:40 +00:00
/** Clear and finish queue
2021-10-07 08:21:01 +00:00
* After that push operation will return false
* pop operations will return values until queue become empty
2021-10-06 11:02:40 +00:00
* Returns true if queue was already finished
*/
bool finish()
2021-08-26 15:00:27 +00:00
{
2021-10-06 11:02:40 +00:00
bool was_finished_before = false;
{
std::lock_guard<std::mutex> lock(queue_mutex);
if (is_finished)
return true;
was_finished_before = is_finished;
is_finished = true;
}
pop_condition.notify_all();
push_condition.notify_all();
return was_finished_before;
2021-08-26 15:00:27 +00:00
}
2021-10-06 11:02:40 +00:00
/// Returns if queue is finished
bool isFinished() const
2021-08-26 15:00:27 +00:00
{
2021-10-06 11:02:40 +00:00
std::lock_guard<std::mutex> lock(queue_mutex);
return is_finished;
2021-08-26 15:00:27 +00:00
}
2021-10-08 08:48:08 +00:00
/// Returns if queue is finished and empty
bool isFinishedAndEmpty() const
{
std::lock_guard<std::mutex> lock(queue_mutex);
return is_finished && queue.empty();
}
2021-10-06 11:02:40 +00:00
/// Clear queue
2013-05-03 02:25:50 +00:00
void clear()
{
2021-10-07 17:13:56 +00:00
{
std::lock_guard<std::mutex> lock(queue_mutex);
2021-10-06 11:02:40 +00:00
2021-10-07 17:13:56 +00:00
if (is_finished)
return;
2021-10-06 11:02:40 +00:00
2021-10-07 17:13:56 +00:00
std::queue<T> empty_queue;
queue.swap(empty_queue);
}
2021-10-06 11:02:40 +00:00
push_condition.notify_all();
2013-05-03 02:25:50 +00:00
}
2021-10-07 08:21:01 +00:00
/// Clear and finish queue
void clearAndFinish()
{
{
std::lock_guard<std::mutex> lock(queue_mutex);
std::queue<T> empty_queue;
queue.swap(empty_queue);
is_finished = true;
}
pop_condition.notify_all();
push_condition.notify_all();
}
2013-05-03 02:25:50 +00:00
};