Remove copy constructor from Chunk.

This commit is contained in:
Nikolai Kochetov 2019-06-18 11:25:27 +03:00
parent 66b0ae52d5
commit 6c2dd9b87d
6 changed files with 28 additions and 10 deletions

View File

@ -58,6 +58,11 @@ Chunk & Chunk::operator=(Chunk && other) noexcept
return *this;
}
Chunk Chunk::clone() const
{
return Chunk(getColumns(), getNumRows());
}
void Chunk::setColumns(Columns columns_, UInt64 num_rows_)
{
columns = std::move(columns_);

View File

@ -19,16 +19,18 @@ class Chunk
{
public:
Chunk() = default;
Chunk(const Chunk & other) = default;
Chunk(const Chunk & other) = delete;
Chunk(Chunk && other) noexcept;
Chunk(Columns columns_, UInt64 num_rows_);
Chunk(Columns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_);
Chunk(MutableColumns columns_, UInt64 num_rows_);
Chunk(MutableColumns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_);
Chunk & operator=(const Chunk & other) = default;
Chunk & operator=(const Chunk & other) = delete;
Chunk & operator=(Chunk && other) noexcept;
Chunk clone() const;
const Columns & getColumns() const { return columns; }
void setColumns(Columns columns_, UInt64 num_rows_);
void setColumns(MutableColumns columns_, UInt64 num_rows_);

View File

@ -12,12 +12,14 @@ ForkProcessor::Status ForkProcessor::prepare()
bool all_finished = true;
bool all_can_push = true;
size_t num_active_outputs = 0;
for (const auto & output : outputs)
{
if (!output.isFinished())
{
all_finished = false;
++num_active_outputs;
/// The order is important.
if (!output.canPush())
@ -55,10 +57,19 @@ ForkProcessor::Status ForkProcessor::prepare()
/// Move data.
auto data = input.pull();
size_t num_processed_outputs = 0;
for (auto & output : outputs)
{
if (!output.isFinished()) /// Skip finished outputs.
output.push(data); /// Can push because no full or unneeded outputs.
{
++num_processed_outputs;
if (num_processed_outputs == num_active_outputs)
output.push(std::move(data)); /// Can push because no full or unneeded outputs.
else
output.push(data.clone());
}
}
/// Now, we pulled from input. It must be empty.
return Status::NeedData;

View File

@ -11,8 +11,8 @@ class LazyOutputFormat : public IOutputFormat
{
public:
explicit LazyOutputFormat(Block header)
: IOutputFormat(std::move(header), out), queue(1), finished_processing(false) {}
explicit LazyOutputFormat(const Block & header)
: IOutputFormat(header, out), queue(1), finished_processing(false) {}
String getName() const override { return "LazyOutputFormat"; }
@ -27,7 +27,7 @@ public:
void setRowsBeforeLimit(size_t rows_before_limit) override;
protected:
void consume(Chunk chunk) override { queue.push(chunk); }
void consume(Chunk chunk) override { queue.emplace(std::move(chunk)); }
void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }
void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); }
@ -36,7 +36,7 @@ protected:
finished_processing = true;
/// In case we are waiting for result.
queue.push({});
queue.emplace(Chunk());
}
private:

View File

@ -577,7 +577,7 @@ void MergeSortingTransform::remerge()
<< " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption");
/// NOTE Maybe concat all blocks and partial sort will be faster than merge?
MergeSorter remerge_sorter(chunks, description, max_merged_block_size, limit);
MergeSorter remerge_sorter(std::move(chunks), description, max_merged_block_size, limit);
Chunks new_chunks;
size_t new_sum_rows_in_blocks = 0;
@ -587,7 +587,7 @@ void MergeSortingTransform::remerge()
{
new_sum_rows_in_blocks += chunk.getNumRows();
new_sum_bytes_in_blocks += chunk.allocatedBytes();
new_chunks.emplace_back(chunk);
new_chunks.emplace_back(std::move(chunk));
}
LOG_DEBUG(log, "Memory usage is lowered from "

View File

@ -153,7 +153,7 @@ void TotalsHavingTransform::transform(Chunk & chunk)
if (!chunk)
return;
auto finalized = chunk;
auto finalized = chunk.clone();
if (final)
finalizeChunk(finalized);