ClickHouse/dbms/include/DB/DataStreams/QueueBlockIOStream.h

70 lines
2.0 KiB
C++
Raw Normal View History

#pragma once
#include <limits>
#include <DB/Common/ConcurrentBoundedQueue.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
namespace DB
{
/** Является одновременно InputStream и OutputStream.
* При записи, кладёт блоки в очередь.
* При чтении, вынимает их из очереди.
* Используется thread-safe очередь.
* Если очередь пуста - чтение блокируется.
* Если очередь переполнена - запись блокируется.
*
* Используется для того, чтобы временно сохранить куда-то результат, и позже передать его дальше.
* Также используется для синхронизации, когда нужно из одного источника сделать несколько
* - для однопроходного выполнения сразу нескольких запросов.
* Также может использоваться для распараллеливания: несколько потоков кладут блоки в очередь, а один - вынимает.
*/
class QueueBlockIOStream : public IProfilingBlockInputStream, public IBlockOutputStream
{
public:
QueueBlockIOStream(size_t queue_size_ = std::numeric_limits<int>::max())
: queue_size(queue_size_), queue(queue_size) {}
String getName() const override { return "QueueBlockIOStream"; }
String getID() const override
{
std::stringstream res;
res << this;
return res.str();
}
void write(const Block & block) override
{
queue.push(block);
}
void cancel() override
{
IProfilingBlockInputStream::cancel();
queue.clear();
}
protected:
Block readImpl() override
{
Block res;
queue.pop(res);
return res;
}
private:
size_t queue_size;
using Queue = ConcurrentBoundedQueue<Block>;
Queue queue;
};
}