ClickHouse/dbms/src/Processors/Formats/LazyOutputFormat.h

69 lines
1.5 KiB
C++
Raw Normal View History

2019-03-26 18:28:37 +00:00
#pragma once
#include <Processors/Formats/IOutputFormat.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <DataStreams/BlockStreamProfileInfo.h>
2019-04-05 10:52:07 +00:00
#include <IO/WriteBuffer.h>
2019-03-26 18:28:37 +00:00
namespace DB
{
class LazyOutputFormat : public IOutputFormat
{
public:
2019-06-18 08:25:27 +00:00
explicit LazyOutputFormat(const Block & header)
: IOutputFormat(header, out), queue(2), finished_processing(false) {}
2019-03-26 18:28:37 +00:00
2019-04-05 10:52:07 +00:00
String getName() const override { return "LazyOutputFormat"; }
2019-03-26 18:28:37 +00:00
Block getBlock(UInt64 milliseconds = 0);
Block getTotals();
Block getExtremes();
2019-04-05 11:43:28 +00:00
bool isFinished() { return finished_processing; }
2019-03-26 18:28:37 +00:00
BlockStreamProfileInfo & getProfileInfo() { return info; }
2019-04-08 14:55:20 +00:00
void setRowsBeforeLimit(size_t rows_before_limit) override;
2020-01-23 10:04:18 +00:00
void finish()
{
finished_processing = true;
/// Clear queue in case if somebody is waiting lazy_format to push.
queue.clear();
}
2019-03-26 18:28:37 +00:00
protected:
void consume(Chunk chunk) override
{
if (!finished_processing)
queue.emplace(std::move(chunk));
}
2019-03-26 18:28:37 +00:00
void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }
void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); }
void finalize() override
{
2019-04-05 11:43:28 +00:00
finished_processing = true;
2019-03-26 18:28:37 +00:00
/// In case we are waiting for result.
2019-06-18 08:25:27 +00:00
queue.emplace(Chunk());
2019-03-26 18:28:37 +00:00
}
private:
ConcurrentBoundedQueue<Chunk> queue;
Chunk totals;
Chunk extremes;
2019-04-05 10:52:07 +00:00
/// Is not used.
static WriteBuffer out;
2019-03-26 18:28:37 +00:00
BlockStreamProfileInfo info;
2019-04-05 11:43:28 +00:00
std::atomic<bool> finished_processing;
2019-03-26 18:28:37 +00:00
};
}