From f7fdfe4ed2b32d975a72d2948bc8d9024816bf9b Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 9 Apr 2020 20:25:44 +0300 Subject: [PATCH] Fix build. --- src/Processors/Merges/IMergingAlgorithm.h | 6 +- src/Processors/Merges/IMergingTransform.cpp | 61 +++---------- src/Processors/Merges/IMergingTransform.h | 86 ++++++++++++++----- .../Merges/MergingSortedAlgorithm.cpp | 9 +- 4 files changed, 90 insertions(+), 72 deletions(-) diff --git a/src/Processors/Merges/IMergingAlgorithm.h b/src/Processors/Merges/IMergingAlgorithm.h index 1dbe88e1370..51ee58cedd2 100644 --- a/src/Processors/Merges/IMergingAlgorithm.h +++ b/src/Processors/Merges/IMergingAlgorithm.h @@ -24,10 +24,12 @@ public: virtual void consume(Chunk chunk, size_t source_num) = 0; virtual Status merge() = 0; + IMergingAlgorithm(IMergingAlgorithm &&) = default; virtual ~IMergingAlgorithm() = default; }; -template -concept MergingAlgorithm = std::is_base_of::value && std::is_move_constructible::value; +// TODO: use when compile with clang which could support it +// template +// concept MergingAlgorithm = std::is_base_of::value && std::is_move_constructible::value; } diff --git a/src/Processors/Merges/IMergingTransform.cpp b/src/Processors/Merges/IMergingTransform.cpp index 295d7361d19..11ab063dce7 100644 --- a/src/Processors/Merges/IMergingTransform.cpp +++ b/src/Processors/Merges/IMergingTransform.cpp @@ -200,28 +200,22 @@ IProcessor::Status IMergingTransform::prepare() return Status::Ready; } - -template -IMergingTransform2::IMergingTransform2( - Algorithm algorithm, - size_t num_inputs, - const Block & input_header, - const Block & output_header, - bool have_all_inputs_) - : IProcessor(InputPorts(num_inputs, input_header), {output_header}) - , algorithm(std::move(algorithm)) - , have_all_inputs(have_all_inputs_) +IMergingTransformBase::IMergingTransformBase( + size_t num_inputs, + const Block & input_header, + const Block & output_header, + bool have_all_inputs_) + : IProcessor(InputPorts(num_inputs, input_header), {output_header}) + , have_all_inputs(have_all_inputs_) { } -template -void IMergingTransform2::onNewInput() +void IMergingTransformBase::onNewInput() { throw Exception("onNewInput is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED); } -template -void IMergingTransform2::addInput() +void IMergingTransformBase::addInput() { if (have_all_inputs) throw Exception("IMergingTransform already have all inputs.", ErrorCodes::LOGICAL_ERROR); @@ -230,8 +224,7 @@ void IMergingTransform2::addInput() onNewInput(); } -template -void IMergingTransform2::setHaveAllInputs() +void IMergingTransformBase::setHaveAllInputs() { if (have_all_inputs) throw Exception("IMergingTransform already have all inputs.", ErrorCodes::LOGICAL_ERROR); @@ -239,8 +232,7 @@ void IMergingTransform2::setHaveAllInputs() have_all_inputs = true; } -template -IProcessor::Status IMergingTransform2::prepareInitializeInputs() +IProcessor::Status IMergingTransformBase::prepareInitializeInputs() { /// Add information about inputs. if (input_states.empty()) @@ -296,8 +288,7 @@ IProcessor::Status IMergingTransform2::prepareInitializeInputs() return Status::Ready; } -template -IProcessor::Status IMergingTransform2::prepare() +IProcessor::Status IMergingTransformBase::prepare() { if (!have_all_inputs) return Status::NeedData; @@ -327,11 +318,8 @@ IProcessor::Status IMergingTransform2::prepare() bool is_port_full = !output.canPush(); /// Push if has data. - if (has_output_chunk && !is_port_full) - { + if (output_chunk && !is_port_full) output.push(std::move(output_chunk)); - has_output_chunk = false; - } if (!is_initialized) return prepareInitializeInputs(); @@ -365,7 +353,7 @@ IProcessor::Status IMergingTransform2::prepare() if (!chunk.hasRows() && !input.isFinished()) return Status::NeedData; - algorithm.consume(std::move(chunk), next_input_to_read); + input_chunk = std::move(chunk); } need_data = false; @@ -377,25 +365,4 @@ IProcessor::Status IMergingTransform2::prepare() return Status::Ready; } -template -void IMergingTransform2::work() -{ - if (!init_chunks.empty()) - algorithm.initialize(std::move(init_chunks)); - - IMergingAlgorithm::Status status = algorithm.merge(); - - if (status.chunk && status.chunk.hasRows()) - { - has_output_chunk = true; - output_chunk = std::move(status.chunk); - } - - if (status.required_source >= 0) - next_input_to_read = status.required_source; - - if (status.is_finished) - is_finished = true; -} - } diff --git a/src/Processors/Merges/IMergingTransform.h b/src/Processors/Merges/IMergingTransform.h index 9122e28a913..3493ea46af1 100644 --- a/src/Processors/Merges/IMergingTransform.h +++ b/src/Processors/Merges/IMergingTransform.h @@ -67,18 +67,14 @@ private: }; /// Base class for merging transforms. -template -class IMergingTransform2 : public IProcessor +class IMergingTransformBase : public IProcessor { public: - IMergingTransform2( - Algorithm algorithm, - size_t num_inputs, - const Block & input_header, - const Block & output_header, - //size_t max_block_size, - //bool use_average_block_size, /// For adaptive granularity. Return chunks with the same avg size as inputs. - bool have_all_inputs_); + IMergingTransformBase( + size_t num_inputs, + const Block & input_header, + const Block & output_header, + bool have_all_inputs_); /// Methods to add additional input port. It is possible to do only before the first call of `prepare`. void addInput(); @@ -86,27 +82,21 @@ public: void setHaveAllInputs(); Status prepare() override; - void work() override; protected: virtual void onNewInput(); /// Is called when new input is added. Only if have_all_inputs = false. virtual void onFinish() {} /// Is called when all data is processed. - /// Profile info. - Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; - Algorithm algorithm; - -private: /// Processor state. Chunk output_chunk; - bool has_output_chunk = false; + Chunk input_chunk; bool is_finished = false; - bool is_initialized = false; bool need_data = false; size_t next_input_to_read = 0; - std::atomic have_all_inputs; + Chunks init_chunks; +private: struct InputState { explicit InputState(InputPort & port_) : port(port_) {} @@ -116,9 +106,63 @@ private: }; std::vector input_states; - Chunks init_chunks; + std::atomic have_all_inputs; + bool is_initialized = false; - Status prepareInitializeInputs(); + IProcessor::Status prepareInitializeInputs(); +}; + +template +class IMergingTransform2 : public IMergingTransformBase +{ +public: + IMergingTransform2( + Algorithm algorithm_, + size_t num_inputs, + const Block & input_header, + const Block & output_header, + bool have_all_inputs_) + : IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_) + , algorithm(std::move(algorithm_)) + { + } + + void work() override + { + if (!init_chunks.empty()) + algorithm.initialize(std::move(init_chunks)); + + if (input_chunk) + algorithm.consume(std::move(input_chunk), next_input_to_read); + + IMergingAlgorithm::Status status = algorithm.merge(); + + if (status.chunk && status.chunk.hasRows()) + output_chunk = std::move(status.chunk); + + if (status.required_source >= 0) + { + next_input_to_read = status.required_source; + need_data = true; + } + + if (status.is_finished) + is_finished = true; + } + +protected: + Algorithm algorithm; + + /// Profile info. + Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; + +private: + using IMergingTransformBase::output_chunk; + using IMergingTransformBase::input_chunk; + using IMergingTransformBase::is_finished; + using IMergingTransformBase::need_data; + using IMergingTransformBase::next_input_to_read; + using IMergingTransformBase::init_chunks; }; } diff --git a/src/Processors/Merges/MergingSortedAlgorithm.cpp b/src/Processors/Merges/MergingSortedAlgorithm.cpp index 556bde169d3..3be9f3f64ef 100644 --- a/src/Processors/Merges/MergingSortedAlgorithm.cpp +++ b/src/Processors/Merges/MergingSortedAlgorithm.cpp @@ -80,6 +80,11 @@ void MergingSortedAlgorithm::consume(Chunk chunk, size_t source_num) prepareChunk(chunk); source_chunks[source_num] = std::move(chunk); cursors[source_num].reset(source_chunks[source_num].getColumns(), {}); + + if (has_collation) + queue_with_collation.push(cursors[source_num]); + else + queue_without_collation.push(cursors[source_num]); } IMergingAlgorithm::Status MergingSortedAlgorithm::merge() @@ -166,9 +171,9 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::insertFromChunk(size_t source_ auto num_rows = source_chunks[source_num].getNumRows(); UInt64 total_merged_rows_after_insertion = merged_data.mergedRows() + num_rows; - bool is_finished = limit && total_merged_rows_after_insertion > limit; + bool is_finished = limit && total_merged_rows_after_insertion >= limit; - if (is_finished) + if (limit && total_merged_rows_after_insertion > limit) { num_rows = total_merged_rows_after_insertion - limit; merged_data.insertFromChunk(std::move(source_chunks[source_num]), num_rows);