diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 4e104898bee..9606fa8c0b5 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -266,6 +266,7 @@ add_library (dbms include/DB/DataStreams/CSVRowOutputStream.h include/DB/DataStreams/CSVRowInputStream.h include/DB/DataStreams/verbosePrintString.h + include/DB/DataStreams/SquashingTransform.h include/DB/DataStreams/SquashingBlockInputStream.h include/DB/DataTypes/IDataType.h include/DB/DataTypes/IDataTypeDummy.h @@ -745,6 +746,7 @@ add_library (dbms src/DataStreams/RemoteBlockInputStream.cpp src/DataStreams/BlockIO.cpp src/DataStreams/verbosePrintString.cpp + src/DataStreams/SquashingTransform.cpp src/DataStreams/SquashingBlockInputStream.cpp src/DataTypes/DataTypeString.cpp diff --git a/dbms/include/DB/DataStreams/SquashingBlockInputStream.h b/dbms/include/DB/DataStreams/SquashingBlockInputStream.h index deeff4a2aa8..44dc71ee846 100644 --- a/dbms/include/DB/DataStreams/SquashingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/SquashingBlockInputStream.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB @@ -37,15 +38,7 @@ protected: Block readImpl() override; private: - size_t min_block_size_rows; - size_t min_block_size_bytes; - - Block accumulated_block; - bool all_read = false; - - void append(Block && block); - - bool isEnoughSize(size_t rows, size_t bytes) const; + SquashingTransform transform; }; } diff --git a/dbms/include/DB/DataStreams/SquashingTransform.h b/dbms/include/DB/DataStreams/SquashingTransform.h new file mode 100644 index 00000000000..438707eced6 --- /dev/null +++ b/dbms/include/DB/DataStreams/SquashingTransform.h @@ -0,0 +1,35 @@ +#include + + +namespace DB +{ + +class SquashingTransform +{ +public: + SquashingTransform(size_t min_block_size_rows, size_t min_block_size_bytes); + + struct Result + { + bool ready = false; + Block block; + + Result(bool ready_) : ready(ready_) {} + Result(Block && block_) : ready(true), block(std::move(block_)) {} + }; + + Result add(Block & block); + +private: + size_t min_block_size_rows; + size_t min_block_size_bytes; + + Block accumulated_block; + bool all_read = false; + + void append(Block && block); + + bool isEnoughSize(size_t rows, size_t bytes) const; +}; + +} diff --git a/dbms/src/DataStreams/SquashingBlockInputStream.cpp b/dbms/src/DataStreams/SquashingBlockInputStream.cpp index b8405be9884..4f62e2dfd36 100644 --- a/dbms/src/DataStreams/SquashingBlockInputStream.cpp +++ b/dbms/src/DataStreams/SquashingBlockInputStream.cpp @@ -5,7 +5,7 @@ namespace DB { SquashingBlockInputStream::SquashingBlockInputStream(BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes) - : min_block_size_rows(min_block_size_rows), min_block_size_bytes(min_block_size_bytes) + : transform(min_block_size_rows, min_block_size_bytes) { children.emplace_back(src); } @@ -13,68 +13,12 @@ SquashingBlockInputStream::SquashingBlockInputStream(BlockInputStreamPtr & src, Block SquashingBlockInputStream::readImpl() { - if (all_read) - return {}; - - while (Block block = children[0]->read()) + while (true) { - /// Just read block is alredy enough. - if (isEnoughSize(block.rowsInFirstColumn(), block.bytes())) - { - /// If no accumulated data, return just read block. - if (!accumulated_block) - return block; - - /// Return accumulated data (may be it has small size) and place new block to accumulated data. - accumulated_block.swap(block); - return block; - } - - /// Accumulated block is already enough. - if (accumulated_block && isEnoughSize(accumulated_block.rowsInFirstColumn(), accumulated_block.bytes())) - { - /// Return accumulated data and place new block to accumulated data. - accumulated_block.swap(block); - return block; - } - - append(std::move(block)); - - if (isEnoughSize(accumulated_block.rowsInFirstColumn(), accumulated_block.bytes())) - { - Block res; - res.swap(accumulated_block); - return res; - } + SquashingTransform::Result result = transform.add(children[0]->read()); + if (result.ready) + return result.block; } - - all_read = true; - return accumulated_block; -} - - -void SquashingBlockInputStream::append(Block && block) -{ - if (!accumulated_block) - { - accumulated_block = std::move(block); - return; - } - - size_t columns = block.columns(); - size_t rows = block.rowsInFirstColumn(); - - for (size_t i = 0; i < columns; ++i) - accumulated_block.unsafeGetByPosition(i).column->insertRangeFrom( - *block.unsafeGetByPosition(i).column, 0, rows); -} - - -bool SquashingBlockInputStream::isEnoughSize(size_t rows, size_t bytes) const -{ - return (!min_block_size_rows && !min_block_size_bytes) - || (min_block_size_rows && rows >= min_block_size_rows) - || (min_block_size_bytes && bytes >= min_block_size_bytes); } } diff --git a/dbms/src/DataStreams/SquashingTransform.cpp b/dbms/src/DataStreams/SquashingTransform.cpp new file mode 100644 index 00000000000..c722aa35bdd --- /dev/null +++ b/dbms/src/DataStreams/SquashingTransform.cpp @@ -0,0 +1,82 @@ +#include + + +namespace DB +{ + +SquashingTransform::SquashingTransform(size_t min_block_size_rows, size_t min_block_size_bytes) + : min_block_size_rows(min_block_size_rows), min_block_size_bytes(min_block_size_bytes) +{ +} + + +SquashingTransform::Result SquashingTransform::add(Block & block) +{ + if (all_read) + return true; + + if (!block) + { + all_read = true; + return Result(std::move(accumulated_block)); + } + + /// Just read block is alredy enough. + if (isEnoughSize(block.rowsInFirstColumn(), block.bytes())) + { + /// If no accumulated data, return just read block. + if (!accumulated_block) + return Result(std::move(block)); + + /// Return accumulated data (may be it has small size) and place new block to accumulated data. + accumulated_block.swap(block); + return Result(std::move(block)); + } + + /// Accumulated block is already enough. + if (accumulated_block && isEnoughSize(accumulated_block.rowsInFirstColumn(), accumulated_block.bytes())) + { + /// Return accumulated data and place new block to accumulated data. + accumulated_block.swap(block); + return Result(std::move(block)); + } + + append(std::move(block)); + + if (isEnoughSize(accumulated_block.rowsInFirstColumn(), accumulated_block.bytes())) + { + Block res; + res.swap(accumulated_block); + return Result(std::move(res)); + } + + /// Squashed block is not ready. + return false; +} + + +void SquashingTransform::append(Block && block) +{ + if (!accumulated_block) + { + accumulated_block = std::move(block); + return; + } + + size_t columns = block.columns(); + size_t rows = block.rowsInFirstColumn(); + + for (size_t i = 0; i < columns; ++i) + accumulated_block.unsafeGetByPosition(i).column->insertRangeFrom( + *block.unsafeGetByPosition(i).column, 0, rows); +} + + +bool SquashingTransform::isEnoughSize(size_t rows, size_t bytes) const +{ + return (!min_block_size_rows && !min_block_size_bytes) + || (min_block_size_rows && rows >= min_block_size_rows) + || (min_block_size_bytes && bytes >= min_block_size_bytes); +} + +}