diff --git a/src/Processors/Transforms/AggregatingTransform.cpp b/src/Processors/Transforms/AggregatingTransform.cpp index 65f0612d738..cdbe194cfac 100644 --- a/src/Processors/Transforms/AggregatingTransform.cpp +++ b/src/Processors/Transforms/AggregatingTransform.cpp @@ -783,7 +783,7 @@ void AggregatingTransform::initGenerate() { /// Just a reasonable constant, matches default value for the setting `preferred_block_size_bytes` static constexpr size_t oneMB = 1024 * 1024; - return std::make_shared(header, params->params.max_block_size, oneMB); + return std::make_shared(header, params->params.max_block_size, oneMB); }); } /// AggregatingTransform::expandPipeline expects single output port. diff --git a/src/Processors/Transforms/SquashingTransform.cpp b/src/Processors/Transforms/SquashingTransform.cpp index 34b733cde5e..b5a40c75c5b 100644 --- a/src/Processors/Transforms/SquashingTransform.cpp +++ b/src/Processors/Transforms/SquashingTransform.cpp @@ -7,6 +7,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; +extern const int SIZES_OF_COLUMNS_DOESNT_MATCH; } SquashingTransform::SquashingTransform( @@ -56,53 +57,170 @@ void SquashingTransform::work() } } -SimpleSquashingTransform::SimpleSquashingTransform( +SimpleSquashingChunksTransform::SimpleSquashingChunksTransform( const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes) - : ISimpleTransform(header, header, false) - , squashing(header, min_block_size_rows, min_block_size_bytes) + : IInflatingTransform(header, header), squashing(min_block_size_rows, min_block_size_bytes) { } -void SimpleSquashingTransform::transform(Chunk & chunk) +void SimpleSquashingChunksTransform::consume(Chunk chunk) { - if (!finished) - { - Chunk planned_chunk = squashing.add(std::move(chunk)); - if (planned_chunk.hasChunkInfo()) - chunk = DB::Squashing::squash(std::move(planned_chunk)); - } - else - { - if (chunk.hasRows()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk expected to be empty, otherwise it will be lost"); - - chunk = squashing.flush(); - if (chunk.hasChunkInfo()) - chunk = DB::Squashing::squash(std::move(chunk)); - } + Block current_block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())); + squashed_chunk.setColumns(current_block.getColumns(), current_block.rows()); } -IProcessor::Status SimpleSquashingTransform::prepare() +Chunk SimpleSquashingChunksTransform::generate() { - if (!finished && input.isFinished()) + if (squashed_chunk.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't generate chunk in SimpleSquashingChunksTransform"); + + return std::move(squashed_chunk); +} + +bool SimpleSquashingChunksTransform::canGenerate() +{ + return !squashed_chunk.empty(); +} + +Chunk SimpleSquashingChunksTransform::getRemaining() +{ + Block current_block = squashing.add({}); + squashed_chunk.setColumns(current_block.getColumns(), current_block.rows()); + return std::move(squashed_chunk); +} + +SquashingLegacy::SquashingLegacy(size_t min_block_size_rows_, size_t min_block_size_bytes_) + : min_block_size_rows(min_block_size_rows_) + , min_block_size_bytes(min_block_size_bytes_) +{ +} + +Block SquashingLegacy::add(Block && input_block) +{ + return addImpl(std::move(input_block)); +} + +Block SquashingLegacy::add(const Block & input_block) +{ + return addImpl(input_block); +} + +/* + * To minimize copying, accept two types of argument: const reference for output + * stream, and rvalue reference for input stream, and decide whether to copy + * inside this function. This allows us not to copy Block unless we absolutely + * have to. + */ +template +Block SquashingLegacy::addImpl(ReferenceType input_block) +{ + /// End of input stream. + if (!input_block) { - if (output.isFinished()) - return Status::Finished; + Block to_return; + std::swap(to_return, accumulated_block); + return to_return; + } - if (!output.canPush()) - return Status::PortFull; - - if (has_output) + /// Just read block is already enough. + if (isEnoughSize(input_block)) + { + /// If no accumulated data, return just read block. + if (!accumulated_block) { - output.pushData(std::move(output_data)); - has_output = false; - return Status::PortFull; + return std::move(input_block); } - finished = true; - /// On the next call to transform() we will return all data buffered in `squashing` (if any) - return Status::Ready; + /// Return accumulated data (maybe it has small size) and place new block to accumulated data. + Block to_return = std::move(input_block); + std::swap(to_return, accumulated_block); + return to_return; } - return ISimpleTransform::prepare(); + + /// Accumulated block is already enough. + if (isEnoughSize(accumulated_block)) + { + /// Return accumulated data and place new block to accumulated data. + Block to_return = std::move(input_block); + std::swap(to_return, accumulated_block); + return to_return; + } + + append(std::move(input_block)); + if (isEnoughSize(accumulated_block)) + { + Block to_return; + std::swap(to_return, accumulated_block); + return to_return; + } + + /// Squashed block is not ready. + return {}; } + + +template +void SquashingLegacy::append(ReferenceType input_block) +{ + if (!accumulated_block) + { + accumulated_block = std::move(input_block); + return; + } + + assert(blocksHaveEqualStructure(input_block, accumulated_block)); + + try + { + for (size_t i = 0, size = accumulated_block.columns(); i < size; ++i) + { + const auto source_column = input_block.getByPosition(i).column; + + auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column)); + mutable_column->insertRangeFrom(*source_column, 0, source_column->size()); + accumulated_block.getByPosition(i).column = std::move(mutable_column); + } + } + catch (...) + { + /// add() may be called again even after a previous add() threw an exception. + /// Keep accumulated_block in a valid state. + /// Seems ok to discard accumulated data because we're throwing an exception, which the caller will + /// hopefully interpret to mean "this block and all *previous* blocks are potentially lost". + accumulated_block.clear(); + throw; + } +} + + +bool SquashingLegacy::isEnoughSize(const Block & block) +{ + size_t rows = 0; + size_t bytes = 0; + + for (const auto & [column, type, name] : block) + { + if (!column) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid column in block."); + + if (!rows) + rows = column->size(); + else if (rows != column->size()) + throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Sizes of columns doesn't match"); + + bytes += column->byteSize(); + } + + return isEnoughSize(rows, bytes); +} + + +bool SquashingLegacy::isEnoughSize(size_t rows, size_t bytes) const +{ + return (!min_block_size_rows && !min_block_size_bytes) + || (min_block_size_rows && rows >= min_block_size_rows) + || (min_block_size_bytes && bytes >= min_block_size_bytes); +} + + } diff --git a/src/Processors/Transforms/SquashingTransform.h b/src/Processors/Transforms/SquashingTransform.h index c5b727ac6ec..452317e7d5e 100644 --- a/src/Processors/Transforms/SquashingTransform.h +++ b/src/Processors/Transforms/SquashingTransform.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -29,22 +30,51 @@ private: Chunk finish_chunk; }; -/// Doesn't care about propagating exceptions and thus doesn't throw LOGICAL_ERROR if the following transform closes its input port. -class SimpleSquashingTransform : public ISimpleTransform + +class SquashingLegacy { public: - explicit SimpleSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes); + /// Conditions on rows and bytes are OR-ed. If one of them is zero, then corresponding condition is ignored. + SquashingLegacy(size_t min_block_size_rows_, size_t min_block_size_bytes_); + + /** Add next block and possibly returns squashed block. + * At end, you need to pass empty block. As the result for last (empty) block, you will get last Result with ready = true. + */ + Block add(Block && block); + Block add(const Block & block); + +private: + size_t min_block_size_rows; + size_t min_block_size_bytes; + + Block accumulated_block; + + template + Block addImpl(ReferenceType block); + + template + void append(ReferenceType block); + + bool isEnoughSize(const Block & block); + bool isEnoughSize(size_t rows, size_t bytes) const; +}; + +class SimpleSquashingChunksTransform : public IInflatingTransform +{ +public: + explicit SimpleSquashingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes); String getName() const override { return "SimpleSquashingTransform"; } protected: - void transform(Chunk &) override; - - IProcessor::Status prepare() override; + void consume(Chunk chunk) override; + bool canGenerate() override; + Chunk generate() override; + Chunk getRemaining() override; private: - Squashing squashing; - - bool finished = false; + SquashingLegacy squashing; + Chunk squashed_chunk; }; + } diff --git a/src/Storages/buildQueryTreeForShard.cpp b/src/Storages/buildQueryTreeForShard.cpp index ed378169381..84ba92bba00 100644 --- a/src/Storages/buildQueryTreeForShard.cpp +++ b/src/Storages/buildQueryTreeForShard.cpp @@ -290,7 +290,7 @@ TableNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node, size_t min_block_size_rows = mutable_context->getSettingsRef().min_external_table_block_size_rows; size_t min_block_size_bytes = mutable_context->getSettingsRef().min_external_table_block_size_bytes; - auto squashing = std::make_shared(builder->getHeader(), min_block_size_rows, min_block_size_bytes); + auto squashing = std::make_shared(builder->getHeader(), min_block_size_rows, min_block_size_bytes); builder->resize(1); builder->addTransform(std::move(squashing));