From 6c2dd9b87de73ef77ae2a3f0e9cdf9f5fcaa0526 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 18 Jun 2019 11:25:27 +0300 Subject: [PATCH] Remove copy constructor from Chunk. --- dbms/src/Processors/Chunk.cpp | 5 +++++ dbms/src/Processors/Chunk.h | 6 ++++-- dbms/src/Processors/ForkProcessor.cpp | 13 ++++++++++++- dbms/src/Processors/Formats/LazyOutputFormat.h | 8 ++++---- .../Processors/Transforms/MergeSortingTransform.cpp | 4 ++-- .../Processors/Transforms/TotalsHavingTransform.cpp | 2 +- 6 files changed, 28 insertions(+), 10 deletions(-) diff --git a/dbms/src/Processors/Chunk.cpp b/dbms/src/Processors/Chunk.cpp index 1a9db9e9ca8..8e4e4054f7b 100644 --- a/dbms/src/Processors/Chunk.cpp +++ b/dbms/src/Processors/Chunk.cpp @@ -58,6 +58,11 @@ Chunk & Chunk::operator=(Chunk && other) noexcept return *this; } +Chunk Chunk::clone() const +{ + return Chunk(getColumns(), getNumRows()); +} + void Chunk::setColumns(Columns columns_, UInt64 num_rows_) { columns = std::move(columns_); diff --git a/dbms/src/Processors/Chunk.h b/dbms/src/Processors/Chunk.h index efe6a594b74..edb0c200cdd 100644 --- a/dbms/src/Processors/Chunk.h +++ b/dbms/src/Processors/Chunk.h @@ -19,16 +19,18 @@ class Chunk { public: Chunk() = default; - Chunk(const Chunk & other) = default; + Chunk(const Chunk & other) = delete; Chunk(Chunk && other) noexcept; Chunk(Columns columns_, UInt64 num_rows_); Chunk(Columns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_); Chunk(MutableColumns columns_, UInt64 num_rows_); Chunk(MutableColumns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_); - Chunk & operator=(const Chunk & other) = default; + Chunk & operator=(const Chunk & other) = delete; Chunk & operator=(Chunk && other) noexcept; + Chunk clone() const; + const Columns & getColumns() const { return columns; } void setColumns(Columns columns_, UInt64 num_rows_); void setColumns(MutableColumns columns_, UInt64 num_rows_); diff --git a/dbms/src/Processors/ForkProcessor.cpp b/dbms/src/Processors/ForkProcessor.cpp index 600e1895914..30903f22433 100644 --- a/dbms/src/Processors/ForkProcessor.cpp +++ b/dbms/src/Processors/ForkProcessor.cpp @@ -12,12 +12,14 @@ ForkProcessor::Status ForkProcessor::prepare() bool all_finished = true; bool all_can_push = true; + size_t num_active_outputs = 0; for (const auto & output : outputs) { if (!output.isFinished()) { all_finished = false; + ++num_active_outputs; /// The order is important. if (!output.canPush()) @@ -55,10 +57,19 @@ ForkProcessor::Status ForkProcessor::prepare() /// Move data. auto data = input.pull(); + size_t num_processed_outputs = 0; for (auto & output : outputs) + { if (!output.isFinished()) /// Skip finished outputs. - output.push(data); /// Can push because no full or unneeded outputs. + { + ++num_processed_outputs; + if (num_processed_outputs == num_active_outputs) + output.push(std::move(data)); /// Can push because no full or unneeded outputs. + else + output.push(data.clone()); + } + } /// Now, we pulled from input. It must be empty. return Status::NeedData; diff --git a/dbms/src/Processors/Formats/LazyOutputFormat.h b/dbms/src/Processors/Formats/LazyOutputFormat.h index 205828cc926..8cb148a0cef 100644 --- a/dbms/src/Processors/Formats/LazyOutputFormat.h +++ b/dbms/src/Processors/Formats/LazyOutputFormat.h @@ -11,8 +11,8 @@ class LazyOutputFormat : public IOutputFormat { public: - explicit LazyOutputFormat(Block header) - : IOutputFormat(std::move(header), out), queue(1), finished_processing(false) {} + explicit LazyOutputFormat(const Block & header) + : IOutputFormat(header, out), queue(1), finished_processing(false) {} String getName() const override { return "LazyOutputFormat"; } @@ -27,7 +27,7 @@ public: void setRowsBeforeLimit(size_t rows_before_limit) override; protected: - void consume(Chunk chunk) override { queue.push(chunk); } + void consume(Chunk chunk) override { queue.emplace(std::move(chunk)); } void consumeTotals(Chunk chunk) override { totals = std::move(chunk); } void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); } @@ -36,7 +36,7 @@ protected: finished_processing = true; /// In case we are waiting for result. - queue.push({}); + queue.emplace(Chunk()); } private: diff --git a/dbms/src/Processors/Transforms/MergeSortingTransform.cpp b/dbms/src/Processors/Transforms/MergeSortingTransform.cpp index fb2a7c5b90f..d3d197881b7 100644 --- a/dbms/src/Processors/Transforms/MergeSortingTransform.cpp +++ b/dbms/src/Processors/Transforms/MergeSortingTransform.cpp @@ -577,7 +577,7 @@ void MergeSortingTransform::remerge() << " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption"); /// NOTE Maybe concat all blocks and partial sort will be faster than merge? - MergeSorter remerge_sorter(chunks, description, max_merged_block_size, limit); + MergeSorter remerge_sorter(std::move(chunks), description, max_merged_block_size, limit); Chunks new_chunks; size_t new_sum_rows_in_blocks = 0; @@ -587,7 +587,7 @@ void MergeSortingTransform::remerge() { new_sum_rows_in_blocks += chunk.getNumRows(); new_sum_bytes_in_blocks += chunk.allocatedBytes(); - new_chunks.emplace_back(chunk); + new_chunks.emplace_back(std::move(chunk)); } LOG_DEBUG(log, "Memory usage is lowered from " diff --git a/dbms/src/Processors/Transforms/TotalsHavingTransform.cpp b/dbms/src/Processors/Transforms/TotalsHavingTransform.cpp index 8df1d0725d2..9b716aca03c 100644 --- a/dbms/src/Processors/Transforms/TotalsHavingTransform.cpp +++ b/dbms/src/Processors/Transforms/TotalsHavingTransform.cpp @@ -153,7 +153,7 @@ void TotalsHavingTransform::transform(Chunk & chunk) if (!chunk) return; - auto finalized = chunk; + auto finalized = chunk.clone(); if (final) finalizeChunk(finalized);