ClickHouse/dbms/src/Processors/Transforms/AggregatingTransform.h

106 lines
2.8 KiB
C++
Raw Normal View History

2019-03-04 14:56:09 +00:00
#pragma once
#include <Processors/IAccumulatingTransform.h>
#include <Interpreters/Aggregator.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
2019-06-25 17:19:32 +00:00
#include <Common/Stopwatch.h>
2019-03-04 14:56:09 +00:00
namespace DB
{
2019-03-04 16:06:28 +00:00
class AggregatedChunkInfo : public ChunkInfo
{
public:
bool is_overflows = false;
Int32 bucket_num = -1;
};
2019-03-04 14:56:09 +00:00
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
struct AggregatingTransformParams
{
Aggregator::Params params;
Aggregator aggregator;
bool final;
2019-08-03 11:02:40 +00:00
AggregatingTransformParams(const Aggregator::Params & params_, bool final_)
: params(params_), aggregator(params), final(final_) {}
2019-03-04 14:56:09 +00:00
Block getHeader() const { return aggregator.getHeader(final); }
};
struct ManyAggregatedData
{
ManyAggregatedDataVariants variants;
2019-09-04 14:06:41 +00:00
std::vector<std::unique_ptr<std::mutex>> mutexes;
std::atomic<UInt32> num_finished = 0;
2019-09-04 14:06:41 +00:00
explicit ManyAggregatedData(size_t num_threads = 0) : variants(num_threads), mutexes(num_threads)
{
for (auto & elem : variants)
elem = std::make_shared<AggregatedDataVariants>();
2019-09-04 14:06:41 +00:00
for (auto & mut : mutexes)
mut = std::make_unique<std::mutex>();
}
};
2019-03-07 15:43:39 +00:00
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
using ManyAggregatedDataPtr = std::shared_ptr<ManyAggregatedData>;
2019-03-04 14:56:09 +00:00
2019-03-07 15:43:39 +00:00
class AggregatingTransform : public IProcessor
2019-03-04 14:56:09 +00:00
{
public:
AggregatingTransform(Block header, AggregatingTransformParamsPtr params_);
/// For Parallel aggregating.
AggregatingTransform(Block header, AggregatingTransformParamsPtr params_,
ManyAggregatedDataPtr many_data, size_t current_variant,
size_t temporary_data_merge_threads, size_t max_threads);
2019-03-04 14:56:09 +00:00
~AggregatingTransform() override;
String getName() const override { return "AggregatingTransform"; }
2019-03-07 15:43:39 +00:00
Status prepare() override;
2019-09-06 13:28:49 +00:00
void work() override;
2019-03-07 15:43:39 +00:00
Processors expandPipeline() override;
2019-03-04 14:56:09 +00:00
protected:
2019-09-06 13:28:49 +00:00
void consume(Chunk chunk);
2019-03-04 14:56:09 +00:00
private:
/// To read the data that was flushed into the temporary data file.
2019-03-07 15:43:39 +00:00
Processors processors;
2019-03-04 14:56:09 +00:00
AggregatingTransformParamsPtr params;
2019-03-04 16:06:28 +00:00
Logger * log = &Logger::get("AggregatingTransform");
2019-03-04 14:56:09 +00:00
ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
bool no_more_keys = false;
ManyAggregatedDataPtr many_data;
AggregatedDataVariants & variants;
size_t max_threads = 1;
size_t temporary_data_merge_threads = 1;
2019-03-04 14:56:09 +00:00
/// TODO: calculate time only for aggregation.
Stopwatch watch;
UInt64 src_rows = 0;
UInt64 src_bytes = 0;
bool is_generate_initialized = false;
2019-03-07 15:43:39 +00:00
bool is_consume_finished = false;
bool is_pipeline_created = false;
Chunk current_chunk;
bool read_current_chunk = false;
2019-03-04 14:56:09 +00:00
2019-03-15 17:06:32 +00:00
bool is_consume_started = false;
2019-03-04 14:56:09 +00:00
void initGenerate();
};
}