From 71feedea6ffb1a7277ea2edb015ba47061d73184 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 14 Apr 2020 12:05:29 +0300 Subject: [PATCH] Remove some code. --- .../Merges/AggregatingSortedTransform.h | 4 +- .../Merges/CollapsingSortedTransform.h | 4 +- .../Merges/GraphiteRollupSortedTransform.h | 4 +- src/Processors/Merges/IMergingTransform.cpp | 209 +----------------- src/Processors/Merges/IMergingTransform.h | 107 +++------ .../Merges/MergingSortedTransform.cpp | 2 +- .../Merges/MergingSortedTransform.h | 2 +- .../Merges/ReplacingSortedTransform.h | 4 +- .../Merges/SummingSortedTransform.h | 4 +- .../Merges/VersionedCollapsingTransform.h | 4 +- 10 files changed, 48 insertions(+), 296 deletions(-) diff --git a/src/Processors/Merges/AggregatingSortedTransform.h b/src/Processors/Merges/AggregatingSortedTransform.h index 89e22173a13..4a70f3e0128 100644 --- a/src/Processors/Merges/AggregatingSortedTransform.h +++ b/src/Processors/Merges/AggregatingSortedTransform.h @@ -14,13 +14,13 @@ class ColumnAggregateFunction; * corresponding to a one value of the primary key. For columns that are not part of the primary key and which do not have the AggregateFunction type, * when merged, the first value is selected. */ -class AggregatingSortedTransform : public IMergingTransform2 +class AggregatingSortedTransform : public IMergingTransform { public: AggregatingSortedTransform( const Block & header, size_t num_inputs, SortDescription description_, size_t max_block_size) - : IMergingTransform2( + : IMergingTransform( num_inputs, header, header, true, header, num_inputs, diff --git a/src/Processors/Merges/CollapsingSortedTransform.h b/src/Processors/Merges/CollapsingSortedTransform.h index 5a9ed16ea2a..cc59bac324c 100644 --- a/src/Processors/Merges/CollapsingSortedTransform.h +++ b/src/Processors/Merges/CollapsingSortedTransform.h @@ -18,7 +18,7 @@ namespace DB * If negative by 1 is greater than positive rows, then only the first negative row is written. * Otherwise, a logical error. */ -class CollapsingSortedTransform final : public IMergingTransform2 +class CollapsingSortedTransform final : public IMergingTransform { public: CollapsingSortedTransform( @@ -29,7 +29,7 @@ public: size_t max_block_size, WriteBuffer * out_row_sources_buf_ = nullptr, bool use_average_block_sizes = false) - : IMergingTransform2( + : IMergingTransform( num_inputs, header, header, true, header, num_inputs, diff --git a/src/Processors/Merges/GraphiteRollupSortedTransform.h b/src/Processors/Merges/GraphiteRollupSortedTransform.h index 0d8493e1eb1..e610903ae5d 100644 --- a/src/Processors/Merges/GraphiteRollupSortedTransform.h +++ b/src/Processors/Merges/GraphiteRollupSortedTransform.h @@ -16,14 +16,14 @@ namespace DB * merge `value` values using the specified aggregate functions, * as well as keeping the maximum value of the `version` column. */ -class GraphiteRollupSortedTransform : public IMergingTransform2 +class GraphiteRollupSortedTransform : public IMergingTransform { public: GraphiteRollupSortedTransform( const Block & header, size_t num_inputs, SortDescription description_, size_t max_block_size, Graphite::Params params_, time_t time_of_merge_) - : IMergingTransform2( + : IMergingTransform( num_inputs, header, header, true, header, num_inputs, diff --git a/src/Processors/Merges/IMergingTransform.cpp b/src/Processors/Merges/IMergingTransform.cpp index 11ab063dce7..0dc4cd41991 100644 --- a/src/Processors/Merges/IMergingTransform.cpp +++ b/src/Processors/Merges/IMergingTransform.cpp @@ -1,5 +1,4 @@ #include -#include namespace DB { @@ -10,196 +9,6 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; } -IMergingTransform::IMergingTransform( - size_t num_inputs, - const Block & input_header, - const Block & output_header, - bool have_all_inputs_) - : IProcessor(InputPorts(num_inputs, input_header), {output_header}) - , have_all_inputs(have_all_inputs_) -{ -} - -void IMergingTransform::onNewInput() -{ - throw Exception("onNewInput is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED); -} - -void IMergingTransform::addInput() -{ - if (have_all_inputs) - throw Exception("IMergingTransform already have all inputs.", ErrorCodes::LOGICAL_ERROR); - - inputs.emplace_back(outputs.front().getHeader(), this); - onNewInput(); -} - -void IMergingTransform::setHaveAllInputs() -{ - if (have_all_inputs) - throw Exception("IMergingTransform already have all inputs.", ErrorCodes::LOGICAL_ERROR); - - have_all_inputs = true; -} - -void IMergingTransform::requestDataForInput(size_t input_number) -{ - if (need_data) - throw Exception("Data was requested for several inputs in IMergingTransform:" - " " + std::to_string(next_input_to_read) + " and " + std::to_string(input_number), - ErrorCodes::LOGICAL_ERROR); - - need_data = true; - next_input_to_read = input_number; -} - -void IMergingTransform::prepareOutputChunk(MergedData & merged_data) -{ - if (need_data) - return; - - has_output_chunk = (is_finished && merged_data.mergedRows()) || merged_data.hasEnoughRows(); - if (has_output_chunk) - output_chunk = merged_data.pull(); -} - -IProcessor::Status IMergingTransform::prepareInitializeInputs() -{ - /// Add information about inputs. - if (input_states.empty()) - { - input_states.reserve(inputs.size()); - for (auto & input : inputs) - input_states.emplace_back(input); - } - - /// Check for inputs we need. - bool all_inputs_has_data = true; - auto it = inputs.begin(); - for (size_t i = 0; it != inputs.end(); ++i, ++it) - { - auto & input = *it; - if (input.isFinished()) - continue; - - if (input_states[i].is_initialized) - { - // input.setNotNeeded(); - continue; - } - - input.setNeeded(); - - if (!input.hasData()) - { - all_inputs_has_data = false; - continue; - } - - auto chunk = input.pull(); - if (!chunk.hasRows()) - { - - if (!input.isFinished()) - all_inputs_has_data = false; - - continue; - } - - consume(std::move(chunk), i); - input_states[i].is_initialized = true; - } - - if (!all_inputs_has_data) - return Status::NeedData; - - initializeInputs(); - - is_initialized = true; - return Status::Ready; -} - - -IProcessor::Status IMergingTransform::prepare() -{ - if (!have_all_inputs) - return Status::NeedData; - - auto & output = outputs.front(); - - /// Special case for no inputs. - if (inputs.empty()) - { - output.finish(); - onFinish(); - return Status::Finished; - } - - /// Check can output. - - if (output.isFinished()) - { - for (auto & in : inputs) - in.close(); - - onFinish(); - return Status::Finished; - } - - /// Do not disable inputs, so it will work in the same way as with AsynchronousBlockInputStream, like before. - bool is_port_full = !output.canPush(); - - /// Push if has data. - if (has_output_chunk && !is_port_full) - { - output.push(std::move(output_chunk)); - has_output_chunk = false; - } - - if (!is_initialized) - return prepareInitializeInputs(); - - if (is_finished) - { - - if (is_port_full) - return Status::PortFull; - - for (auto & input : inputs) - input.close(); - - outputs.front().finish(); - - onFinish(); - return Status::Finished; - } - - if (need_data) - { - auto & input = input_states[next_input_to_read].port; - if (!input.isFinished()) - { - input.setNeeded(); - - if (!input.hasData()) - return Status::NeedData; - - auto chunk = input.pull(); - if (!chunk.hasRows() && !input.isFinished()) - return Status::NeedData; - - consume(std::move(chunk), next_input_to_read); - } - - need_data = false; - } - - if (is_port_full) - return Status::PortFull; - - return Status::Ready; -} - IMergingTransformBase::IMergingTransformBase( size_t num_inputs, const Block & input_header, @@ -241,7 +50,7 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs() for (auto & input : inputs) input_states.emplace_back(input); - init_chunks.resize(inputs.size()); + state.init_chunks.resize(inputs.size()); } /// Check for inputs we need. @@ -277,7 +86,7 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs() continue; } - init_chunks[i] = std::move(chunk); + state.init_chunks[i] = std::move(chunk); input_states[i].is_initialized = true; } @@ -318,13 +127,13 @@ IProcessor::Status IMergingTransformBase::prepare() bool is_port_full = !output.canPush(); /// Push if has data. - if (output_chunk && !is_port_full) - output.push(std::move(output_chunk)); + if (state.output_chunk && !is_port_full) + output.push(std::move(state.output_chunk)); if (!is_initialized) return prepareInitializeInputs(); - if (is_finished) + if (state.is_finished) { if (is_port_full) @@ -339,9 +148,9 @@ IProcessor::Status IMergingTransformBase::prepare() return Status::Finished; } - if (need_data) + if (state.need_data) { - auto & input = input_states[next_input_to_read].port; + auto & input = input_states[state.next_input_to_read].port; if (!input.isFinished()) { input.setNeeded(); @@ -353,10 +162,10 @@ IProcessor::Status IMergingTransformBase::prepare() if (!chunk.hasRows() && !input.isFinished()) return Status::NeedData; - input_chunk = std::move(chunk); + state.input_chunk = std::move(chunk); } - need_data = false; + state.need_data = false; } if (is_port_full) diff --git a/src/Processors/Merges/IMergingTransform.h b/src/Processors/Merges/IMergingTransform.h index 260f57806a7..2c5213d255b 100644 --- a/src/Processors/Merges/IMergingTransform.h +++ b/src/Processors/Merges/IMergingTransform.h @@ -7,66 +7,8 @@ namespace DB { -class MergedData; - -/// Base class for merging transforms. -class IMergingTransform : public IProcessor -{ -public: - IMergingTransform( - size_t num_inputs, - const Block & input_header, - const Block & output_header, - //size_t max_block_size, - //bool use_average_block_size, /// For adaptive granularity. Return chunks with the same avg size as inputs. - bool have_all_inputs_); - - /// Methods to add additional input port. It is possible to do only before the first call of `prepare`. - void addInput(); - /// Need to be called after all inputs are added. (only if have_all_inputs was not specified). - void setHaveAllInputs(); - - Status prepare() override; - -protected: - - virtual void onNewInput(); /// Is called when new input is added. To initialize input's data. - virtual void initializeInputs() = 0; /// Is called after first chunk was read for every input. - virtual void consume(Chunk chunk, size_t input_number) = 0; /// Is called after chunk was consumed from input. - virtual void onFinish() {} /// Is called when all data is processed. - - void requestDataForInput(size_t input_number); /// Call it to say that next chunk of data is required for input. - void prepareOutputChunk(MergedData & merged_data); /// Moves chunk from merged_data to output_chunk if needed. - - /// Profile info. - Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; - - Chunk output_chunk; - bool has_output_chunk = false; - bool is_finished = false; - -private: - /// Processor state. - bool is_initialized = false; - bool need_data = false; - size_t next_input_to_read = 0; - - std::atomic have_all_inputs; - - struct InputState - { - explicit InputState(InputPort & port_) : port(port_) {} - - InputPort & port; - bool is_initialized = false; - }; - - std::vector input_states; - - Status prepareInitializeInputs(); -}; - -/// Base class for merging transforms. +/// Base class for IMergingTransform. +/// It is needed to extract all non-template methods in single translation unit. class IMergingTransformBase : public IProcessor { public: @@ -88,13 +30,18 @@ protected: virtual void onFinish() {} /// Is called when all data is processed. /// Processor state. - Chunk output_chunk; - Chunk input_chunk; - bool is_finished = false; - bool need_data = false; - size_t next_input_to_read = 0; + struct State + { + Chunk output_chunk; + Chunk input_chunk; + bool is_finished = false; + bool need_data = false; + size_t next_input_to_read = 0; - Chunks init_chunks; + Chunks init_chunks; + }; + + State state; private: struct InputState @@ -112,12 +59,13 @@ private: IProcessor::Status prepareInitializeInputs(); }; +/// Implementation of MergingTransform using IMergingAlgorithm. template -class IMergingTransform2 : public IMergingTransformBase +class IMergingTransform : public IMergingTransformBase { public: template - IMergingTransform2( + IMergingTransform( size_t num_inputs, const Block & input_header, const Block & output_header, @@ -130,25 +78,25 @@ public: void work() override { - if (!init_chunks.empty()) - algorithm.initialize(std::move(init_chunks)); + if (!state.init_chunks.empty()) + algorithm.initialize(std::move(state.init_chunks)); - if (input_chunk) - algorithm.consume(std::move(input_chunk), next_input_to_read); + if (state.input_chunk) + algorithm.consume(std::move(state.input_chunk), state.next_input_to_read); IMergingAlgorithm::Status status = algorithm.merge(); if (status.chunk && status.chunk.hasRows()) - output_chunk = std::move(status.chunk); + state.output_chunk = std::move(status.chunk); if (status.required_source >= 0) { - next_input_to_read = status.required_source; - need_data = true; + state.next_input_to_read = status.required_source; + state.need_data = true; } if (status.is_finished) - is_finished = true; + state.is_finished = true; } protected: @@ -158,12 +106,7 @@ protected: Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; private: - using IMergingTransformBase::output_chunk; - using IMergingTransformBase::input_chunk; - using IMergingTransformBase::is_finished; - using IMergingTransformBase::need_data; - using IMergingTransformBase::next_input_to_read; - using IMergingTransformBase::init_chunks; + using IMergingTransformBase::state; }; } diff --git a/src/Processors/Merges/MergingSortedTransform.cpp b/src/Processors/Merges/MergingSortedTransform.cpp index 68af48062ba..1bd0b289529 100644 --- a/src/Processors/Merges/MergingSortedTransform.cpp +++ b/src/Processors/Merges/MergingSortedTransform.cpp @@ -18,7 +18,7 @@ MergingSortedTransform::MergingSortedTransform( bool quiet_, bool use_average_block_sizes, bool have_all_inputs_) - : IMergingTransform2( + : IMergingTransform( num_inputs, header, header, have_all_inputs_, header, num_inputs, diff --git a/src/Processors/Merges/MergingSortedTransform.h b/src/Processors/Merges/MergingSortedTransform.h index 8763fbe1aa2..9e783a0730a 100644 --- a/src/Processors/Merges/MergingSortedTransform.h +++ b/src/Processors/Merges/MergingSortedTransform.h @@ -8,7 +8,7 @@ namespace DB { /// Merges several sorted inputs into one sorted output. -class MergingSortedTransform final : public IMergingTransform2 +class MergingSortedTransform final : public IMergingTransform { public: MergingSortedTransform( diff --git a/src/Processors/Merges/ReplacingSortedTransform.h b/src/Processors/Merges/ReplacingSortedTransform.h index a6c36cedb71..b36db3288cc 100644 --- a/src/Processors/Merges/ReplacingSortedTransform.h +++ b/src/Processors/Merges/ReplacingSortedTransform.h @@ -11,7 +11,7 @@ namespace DB * For each group of consecutive identical values of the primary key (the columns by which the data is sorted), * keeps row with max `version` value. */ -class ReplacingSortedTransform final : public IMergingTransform2 +class ReplacingSortedTransform final : public IMergingTransform { public: ReplacingSortedTransform( @@ -20,7 +20,7 @@ public: size_t max_block_size, WriteBuffer * out_row_sources_buf_ = nullptr, bool use_average_block_sizes = false) - : IMergingTransform2( + : IMergingTransform( num_inputs, header, header, true, header, num_inputs, diff --git a/src/Processors/Merges/SummingSortedTransform.h b/src/Processors/Merges/SummingSortedTransform.h index 37859e1b88b..7b7f688a7dd 100644 --- a/src/Processors/Merges/SummingSortedTransform.h +++ b/src/Processors/Merges/SummingSortedTransform.h @@ -11,7 +11,7 @@ namespace DB * collapses them into one row, summing all the numeric columns except the primary key. * If in all numeric columns, except for the primary key, the result is zero, it deletes the row. */ -class SummingSortedTransform final : public IMergingTransform2 +class SummingSortedTransform final : public IMergingTransform { public: @@ -21,7 +21,7 @@ public: /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. const Names & column_names_to_sum, size_t max_block_size) - : IMergingTransform2( + : IMergingTransform( num_inputs, header, header, true, header, num_inputs, diff --git a/src/Processors/Merges/VersionedCollapsingTransform.h b/src/Processors/Merges/VersionedCollapsingTransform.h index 31b5673ee4d..efd5e96fac7 100644 --- a/src/Processors/Merges/VersionedCollapsingTransform.h +++ b/src/Processors/Merges/VersionedCollapsingTransform.h @@ -12,7 +12,7 @@ namespace DB * (the columns by which the data is sorted, including specially specified version column), * merges any pair of consecutive rows with opposite sign. */ -class VersionedCollapsingTransform final : public IMergingTransform2 +class VersionedCollapsingTransform final : public IMergingTransform { public: /// Don't need version column. It's in primary key. @@ -22,7 +22,7 @@ public: size_t max_block_size, WriteBuffer * out_row_sources_buf_ = nullptr, bool use_average_block_sizes = false) - : IMergingTransform2( + : IMergingTransform( num_inputs, header, header, true, header, num_inputs,