diff --git a/src/Interpreters/Squashing.cpp b/src/Interpreters/Squashing.cpp index 7ebe4a930c9..46e21635a30 100644 --- a/src/Interpreters/Squashing.cpp +++ b/src/Interpreters/Squashing.cpp @@ -1,7 +1,6 @@ #include #include #include -#include namespace DB @@ -135,46 +134,52 @@ ApplySquashing::ApplySquashing(Block header_) { } -Block ApplySquashing::add(Chunk && input_chunk) +Chunk ApplySquashing::add(Chunk && input_chunk) { return addImpl(std::move(input_chunk)); } -Block ApplySquashing::addImpl(Chunk && input_chunk) +Chunk ApplySquashing::addImpl(Chunk && input_chunk) { if (!input_chunk.hasChunkInfo()) - return Block(); + return Chunk(); const auto *info = getInfoFromChunk(input_chunk); - for (auto & chunk : info->chunks) - append(chunk); + append(info->chunks); Block to_return; std::swap(to_return, accumulated_block); - return to_return; + return Chunk(to_return.getColumns(), to_return.rows()); } -void ApplySquashing::append(Chunk & input_chunk) +void ApplySquashing::append(const std::vector & input_chunks) { - if (input_chunk.getNumColumns() == 0) - return; - if (!accumulated_block) + std::vector mutable_columns; + size_t rows = 0; + for (const Chunk & chunk : input_chunks) + rows += chunk.getNumRows(); + + for (const auto & input_chunk : input_chunks) { - for (size_t i = 0; i < input_chunk.getNumColumns(); ++ i) + if (!accumulated_block) { - ColumnWithTypeAndName col = ColumnWithTypeAndName(input_chunk.getColumns()[i], header.getDataTypes()[i], header.getNames()[i]); - accumulated_block.insert(accumulated_block.columns(), col); + for (size_t i = 0; i < input_chunks[0].getNumColumns(); ++i) + { // We can put this part of code out of the cycle, but it will consume more memory + ColumnWithTypeAndName col = ColumnWithTypeAndName(input_chunks[0].getColumns()[i],header.getDataTypes()[i], header.getNames()[i]); + mutable_columns.push_back(IColumn::mutate(col.column)); + mutable_columns[i]->reserve(rows); + accumulated_block.insert(col); + } + continue; } - return; - } - for (size_t i = 0, size = accumulated_block.columns(); i < size; ++i) - { - const auto source_column = input_chunk.getColumns()[i]; + for (size_t i = 0, size = accumulated_block.columns(); i < size; ++i) + { + const auto source_column = input_chunk.getColumns()[i]; - auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column)); - mutable_column->insertRangeFrom(*source_column, 0, source_column->size()); - accumulated_block.getByPosition(i).column = std::move(mutable_column); + mutable_columns[i]->insertRangeFrom(*source_column, 0, source_column->size()); + accumulated_block.getByPosition(i).column = mutable_columns[i]->cloneFinalized(); + } } } @@ -283,7 +288,6 @@ bool PlanSquashing::isEnoughSize(const std::vector & chunks) bool PlanSquashing::isEnoughSize(size_t rows, size_t bytes) const { - LOG_TRACE(getLogger("Planning"), "rows: {}, bytes: {}", rows, bytes); return (!min_block_size_rows && !min_block_size_bytes) || (min_block_size_rows && rows >= min_block_size_rows) || (min_block_size_bytes && bytes >= min_block_size_bytes); diff --git a/src/Interpreters/Squashing.h b/src/Interpreters/Squashing.h index 0e9f001762f..d116ff1eddd 100644 --- a/src/Interpreters/Squashing.h +++ b/src/Interpreters/Squashing.h @@ -1,7 +1,5 @@ #pragma once -#include -#include #include #include #include @@ -60,17 +58,17 @@ class ApplySquashing public: explicit ApplySquashing(Block header_); - Block add(Chunk && input_chunk); + Chunk add(Chunk && input_chunk); private: Block accumulated_block; const Block header; - Block addImpl(Chunk && chunk); + Chunk addImpl(Chunk && chunk); const ChunksToSquash * getInfoFromChunk(const Chunk & chunk); - void append(Chunk & input_chunk); + void append(const std::vector & input_chunks); bool isEnoughSize(const Block & block); bool isEnoughSize(size_t rows, size_t bytes) const; diff --git a/src/Processors/Transforms/ApplySquashingTransform.h b/src/Processors/Transforms/ApplySquashingTransform.h index abb3a0aad41..e63691fcc6a 100644 --- a/src/Processors/Transforms/ApplySquashingTransform.h +++ b/src/Processors/Transforms/ApplySquashingTransform.h @@ -37,8 +37,8 @@ public: protected: void onConsume(Chunk chunk) override { - if (auto block = squashing.add(std::move(chunk))) - cur_chunk.setColumns(block.getColumns(), block.rows()); + if (auto res_chunk = squashing.add(std::move(chunk))) + cur_chunk.setColumns(res_chunk.getColumns(), res_chunk.getNumRows()); } GenerateResult onGenerate() override @@ -50,8 +50,8 @@ protected: } void onFinish() override { - auto block = squashing.add({}); - finish_chunk.setColumns(block.getColumns(), block.rows()); + auto chunk = squashing.add({}); + finish_chunk.setColumns(chunk.getColumns(), chunk.getNumRows()); } private: diff --git a/src/Processors/Transforms/PlanSquashingTransform.h b/src/Processors/Transforms/PlanSquashingTransform.h index dc5b6d669b1..7afc942a7f2 100644 --- a/src/Processors/Transforms/PlanSquashingTransform.h +++ b/src/Processors/Transforms/PlanSquashingTransform.h @@ -3,7 +3,6 @@ #include #include #include -#include "Processors/Port.h" enum PlanningStatus { diff --git a/src/Processors/Transforms/SquashingTransform.cpp b/src/Processors/Transforms/SquashingTransform.cpp index 8f7f6488d3e..a516811bf45 100644 --- a/src/Processors/Transforms/SquashingTransform.cpp +++ b/src/Processors/Transforms/SquashingTransform.cpp @@ -1,5 +1,5 @@ #include -#include +#include namespace DB { @@ -12,14 +12,16 @@ extern const int LOGICAL_ERROR; SquashingTransform::SquashingTransform( const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes) : ExceptionKeepingTransform(header, header, false) - , squashing(min_block_size_rows, min_block_size_bytes) + , planSquashing(header, min_block_size_rows, min_block_size_bytes) + , applySquashing(header) { } void SquashingTransform::onConsume(Chunk chunk) { - if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()))) - cur_chunk.setColumns(block.getColumns(), block.rows()); + Chunk planned_chunk = planSquashing.add(std::move(chunk)); + if (planned_chunk.hasChunkInfo()) + cur_chunk = applySquashing.add(std::move(planned_chunk)); } SquashingTransform::GenerateResult SquashingTransform::onGenerate() @@ -32,8 +34,10 @@ SquashingTransform::GenerateResult SquashingTransform::onGenerate() void SquashingTransform::onFinish() { - auto block = squashing.add({}); - finish_chunk.setColumns(block.getColumns(), block.rows()); + Chunk chunk = planSquashing.flush(); + if (chunk.hasChunkInfo()) + chunk = applySquashing.add(std::move(chunk)); + finish_chunk.setColumns(chunk.getColumns(), chunk.getNumRows()); } void SquashingTransform::work() @@ -55,7 +59,9 @@ void SquashingTransform::work() SimpleSquashingTransform::SimpleSquashingTransform( const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes) - : ISimpleTransform(header, header, false), squashing(min_block_size_rows, min_block_size_bytes) + : ISimpleTransform(header, header, false) + , planSquashing(header, min_block_size_rows, min_block_size_bytes) + , applySquashing(header) { } @@ -63,16 +69,18 @@ void SimpleSquashingTransform::transform(Chunk & chunk) { if (!finished) { - if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()))) - chunk.setColumns(block.getColumns(), block.rows()); + Chunk planned_chunk = planSquashing.add(std::move(chunk)); + if (planned_chunk.hasChunkInfo()) + chunk = applySquashing.add(std::move(planned_chunk)); } else { if (chunk.hasRows()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk expected to be empty, otherwise it will be lost"); - auto block = squashing.add({}); - chunk.setColumns(block.getColumns(), block.rows()); + chunk = planSquashing.flush(); + if (chunk.hasChunkInfo()) + chunk = applySquashing.add(std::move(chunk)); } } diff --git a/src/Processors/Transforms/SquashingTransform.h b/src/Processors/Transforms/SquashingTransform.h index c5b727ac6ec..b5b3c6616d2 100644 --- a/src/Processors/Transforms/SquashingTransform.h +++ b/src/Processors/Transforms/SquashingTransform.h @@ -24,7 +24,8 @@ protected: void onFinish() override; private: - Squashing squashing; + PlanSquashing planSquashing; + ApplySquashing applySquashing; Chunk cur_chunk; Chunk finish_chunk; }; @@ -43,7 +44,8 @@ protected: IProcessor::Status prepare() override; private: - Squashing squashing; + PlanSquashing planSquashing; + ApplySquashing applySquashing; bool finished = false; };