#include namespace DB { namespace ErrorCodes { extern const int SIZES_OF_COLUMNS_DOESNT_MATCH; } SquashingTransform::SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_, bool reserve_memory_) : min_block_size_rows(min_block_size_rows_) , min_block_size_bytes(min_block_size_bytes_) , reserve_memory(reserve_memory_) { } Block SquashingTransform::add(Block && input_block) { return addImpl(std::move(input_block)); } Block SquashingTransform::add(const Block & input_block) { return addImpl(input_block); } /* * To minimize copying, accept two types of argument: const reference for output * stream, and rvalue reference for input stream, and decide whether to copy * inside this function. This allows us not to copy Block unless we absolutely * have to. */ template Block SquashingTransform::addImpl(ReferenceType input_block) { /// End of input stream. if (!input_block) { Block to_return; std::swap(to_return, accumulated_block); return to_return; } /// Just read block is already enough. if (isEnoughSize(input_block)) { /// If no accumulated data, return just read block. if (!accumulated_block) { return std::move(input_block); } /// Return accumulated data (maybe it has small size) and place new block to accumulated data. Block to_return = std::move(input_block); std::swap(to_return, accumulated_block); return to_return; } /// Accumulated block is already enough. if (isEnoughSize(accumulated_block)) { /// Return accumulated data and place new block to accumulated data. Block to_return = std::move(input_block); std::swap(to_return, accumulated_block); return to_return; } append(std::move(input_block)); if (isEnoughSize(accumulated_block)) { Block to_return; std::swap(to_return, accumulated_block); return to_return; } /// Squashed block is not ready. return {}; } template void SquashingTransform::append(ReferenceType input_block) { if (!accumulated_block) { accumulated_block = std::move(input_block); return; } assert(blocksHaveEqualStructure(input_block, accumulated_block)); for (size_t i = 0, size = accumulated_block.columns(); i < size; ++i) { const auto source_column = input_block.getByPosition(i).column; auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column)); if (reserve_memory) { mutable_column->reserve(min_block_size_bytes); } mutable_column->insertRangeFrom(*source_column, 0, source_column->size()); accumulated_block.getByPosition(i).column = std::move(mutable_column); } } bool SquashingTransform::isEnoughSize(const Block & block) { size_t rows = 0; size_t bytes = 0; for (const auto & [column, type, name] : block) { if (!rows) rows = column->size(); else if (rows != column->size()) throw Exception("Sizes of columns doesn't match", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); bytes += column->byteSize(); } return isEnoughSize(rows, bytes); } bool SquashingTransform::isEnoughSize(size_t rows, size_t bytes) const { return (!min_block_size_rows && !min_block_size_bytes) || (min_block_size_rows && rows >= min_block_size_rows) || (min_block_size_bytes && bytes >= min_block_size_bytes); } }