diff --git a/src/Interpreters/Squashing.cpp b/src/Interpreters/Squashing.cpp index 855bf32abe9..3872c2ba6b9 100644 --- a/src/Interpreters/Squashing.cpp +++ b/src/Interpreters/Squashing.cpp @@ -1,6 +1,7 @@ #include #include #include +#include "Columns/IColumn.h" namespace DB @@ -11,124 +12,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -Squashing::Squashing(size_t min_block_size_rows_, size_t min_block_size_bytes_) - : min_block_size_rows(min_block_size_rows_) - , min_block_size_bytes(min_block_size_bytes_) -{ -} - -Block Squashing::add(Block && input_block) -{ - return addImpl(std::move(input_block)); -} - -Block Squashing::add(const Block & input_block) -{ - return addImpl(input_block); -} - -/* - * To minimize copying, accept two types of argument: const reference for output - * stream, and rvalue reference for input stream, and decide whether to copy - * inside this function. This allows us not to copy Block unless we absolutely - * have to. - */ -template -Block Squashing::addImpl(ReferenceType input_block) -{ - /// End of input stream. - if (!input_block) - { - Block to_return; - std::swap(to_return, accumulated_block); - return to_return; - } - - /// Just read block is already enough. - if (isEnoughSize(input_block)) - { - /// If no accumulated data, return just read block. - if (!accumulated_block) - { - return std::move(input_block); - } - - /// Return accumulated data (maybe it has small size) and place new block to accumulated data. - Block to_return = std::move(input_block); - std::swap(to_return, accumulated_block); - return to_return; - } - - /// Accumulated block is already enough. - if (isEnoughSize(accumulated_block)) - { - /// Return accumulated data and place new block to accumulated data. - Block to_return = std::move(input_block); - std::swap(to_return, accumulated_block); - return to_return; - } - - append(std::move(input_block)); - if (isEnoughSize(accumulated_block)) - { - Block to_return; - std::swap(to_return, accumulated_block); - return to_return; - } - - /// Squashed block is not ready. - return {}; -} - - -template -void Squashing::append(ReferenceType input_block) -{ - if (!accumulated_block) - { - accumulated_block = std::move(input_block); - return; - } - - assert(blocksHaveEqualStructure(input_block, accumulated_block)); - - for (size_t i = 0, size = accumulated_block.columns(); i < size; ++i) - { - const auto source_column = input_block.getByPosition(i).column; - - auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column)); - mutable_column->insertRangeFrom(*source_column, 0, source_column->size()); - accumulated_block.getByPosition(i).column = std::move(mutable_column); - } -} - - -bool Squashing::isEnoughSize(const Block & block) -{ - size_t rows = 0; - size_t bytes = 0; - - for (const auto & [column, type, name] : block) - { - if (!rows) - rows = column->size(); - else if (rows != column->size()) - throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "Sizes of columns doesn't match"); - - bytes += column->byteSize(); - } - - return isEnoughSize(rows, bytes); -} - - -bool Squashing::isEnoughSize(size_t rows, size_t bytes) const -{ - return (!min_block_size_rows && !min_block_size_bytes) - || (min_block_size_rows && rows >= min_block_size_rows) - || (min_block_size_bytes && bytes >= min_block_size_bytes); -} - ApplySquashing::ApplySquashing(Block header_) : header(std::move(header_)) { @@ -187,10 +70,9 @@ const ChunksToSquash* ApplySquashing::getInfoFromChunk(const Chunk & chunk) return agg_info; } -PlanSquashing::PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_) +PlanSquashing::PlanSquashing(size_t min_block_size_rows_, size_t min_block_size_bytes_) : min_block_size_rows(min_block_size_rows_) , min_block_size_bytes(min_block_size_bytes_) - , header(std::move(header_)) { } @@ -199,7 +81,7 @@ Chunk PlanSquashing::flush() return convertToChunk(std::move(chunks_to_merge_vec)); } -Chunk PlanSquashing::add(Chunk & input_chunk) +Chunk PlanSquashing::add(Chunk && input_chunk) { if (!input_chunk) return {}; @@ -260,7 +142,8 @@ Chunk PlanSquashing::convertToChunk(std::vector && chunks) chunks.clear(); - return Chunk(header.cloneEmptyColumns(), 0, info); + Columns cols = {}; + return Chunk(cols, 0, info); } void PlanSquashing::expandCurrentSize(size_t rows, size_t bytes) diff --git a/src/Interpreters/Squashing.h b/src/Interpreters/Squashing.h index 0e844c4912b..d9d430c1835 100644 --- a/src/Interpreters/Squashing.h +++ b/src/Interpreters/Squashing.h @@ -25,33 +25,6 @@ struct ChunksToSquash : public ChunkInfo * * Order of data is kept. */ -class Squashing -{ -public: - /// Conditions on rows and bytes are OR-ed. If one of them is zero, then corresponding condition is ignored. - Squashing(size_t min_block_size_rows_, size_t min_block_size_bytes_); - - /** Add next block and possibly returns squashed block. - * At end, you need to pass empty block. As the result for last (empty) block, you will get last Result with ready = true. - */ - Block add(Block && block); - Block add(const Block & block); - -private: - size_t min_block_size_rows; - size_t min_block_size_bytes; - - Block accumulated_block; - - template - Block addImpl(ReferenceType block); - - template - void append(ReferenceType block); - - bool isEnoughSize(const Block & block); - bool isEnoughSize(size_t rows, size_t bytes) const; -}; class ApplySquashing { @@ -75,9 +48,9 @@ private: class PlanSquashing { public: - PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_); + PlanSquashing(size_t min_block_size_rows_, size_t min_block_size_bytes_); - Chunk add(Chunk & input_chunk); + Chunk add(Chunk && input_chunk); Chunk flush(); bool isDataLeft() { @@ -95,7 +68,7 @@ private: size_t min_block_size_rows; size_t min_block_size_bytes; - const Block header; + // const Block header; CurrentSize accumulated_size; void expandCurrentSize(size_t rows, size_t bytes); diff --git a/src/Processors/Transforms/PlanSquashingTransform.cpp b/src/Processors/Transforms/PlanSquashingTransform.cpp index 1384f760d48..96f41e37d2f 100644 --- a/src/Processors/Transforms/PlanSquashingTransform.cpp +++ b/src/Processors/Transforms/PlanSquashingTransform.cpp @@ -11,7 +11,7 @@ namespace ErrorCodes } PlanSquashingTransform::PlanSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t num_ports) - : IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), balance(header, min_block_size_rows, min_block_size_bytes) + : IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), balance(min_block_size_rows, min_block_size_bytes) { } @@ -134,7 +134,7 @@ IProcessor::Status PlanSquashingTransform::waitForDataIn() void PlanSquashingTransform::transform(Chunk & chunk_) { - Chunk res_chunk = balance.add(chunk_); + Chunk res_chunk = balance.add(std::move(chunk_)); std::swap(res_chunk, chunk_); } diff --git a/src/Processors/Transforms/SquashingTransform.cpp b/src/Processors/Transforms/SquashingTransform.cpp index 67358316d48..6f7c877b2f3 100644 --- a/src/Processors/Transforms/SquashingTransform.cpp +++ b/src/Processors/Transforms/SquashingTransform.cpp @@ -12,14 +12,14 @@ extern const int LOGICAL_ERROR; SquashingTransform::SquashingTransform( const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes) : ExceptionKeepingTransform(header, header, false) - , planSquashing(header, min_block_size_rows, min_block_size_bytes) + , planSquashing(min_block_size_rows, min_block_size_bytes) , applySquashing(header) { } void SquashingTransform::onConsume(Chunk chunk) { - Chunk planned_chunk = planSquashing.add(chunk); + Chunk planned_chunk = planSquashing.add(std::move(chunk)); if (planned_chunk.hasChunkInfo()) cur_chunk = applySquashing.add(std::move(planned_chunk)); } @@ -60,7 +60,7 @@ void SquashingTransform::work() SimpleSquashingTransform::SimpleSquashingTransform( const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes) : ISimpleTransform(header, header, false) - , planSquashing(header, min_block_size_rows, min_block_size_bytes) + , planSquashing(min_block_size_rows, min_block_size_bytes) , applySquashing(header) { } @@ -69,7 +69,7 @@ void SimpleSquashingTransform::transform(Chunk & chunk) { if (!finished) { - Chunk planned_chunk = planSquashing.add(chunk); + Chunk planned_chunk = planSquashing.add(std::move(chunk)); if (planned_chunk.hasChunkInfo()) chunk = applySquashing.add(std::move(planned_chunk)); } diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 9f14facdf8f..476c4dd372b 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -885,13 +885,21 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro using PushResult = AsynchronousInsertQueue::PushResult; startInsertQuery(); - Squashing squashing(0, query_context->getSettingsRef().async_insert_max_data_size); + PlanSquashing plan_squashing(0, query_context->getSettingsRef().async_insert_max_data_size); + ApplySquashing apply_squashing(state.input_header); while (readDataNext()) { - auto result = squashing.add(std::move(state.block_for_insert)); - if (result) + auto planned_chunk = plan_squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()}); + Chunk result_chunk; + if (planned_chunk.hasChunkInfo()) + result_chunk = apply_squashing.add(std::move(planned_chunk)); + if (result_chunk) { + ColumnsWithTypeAndName cols; + for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) + cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.input_header.getDataTypes()[j], state.input_header.getNames()[j])); + auto result = Block(cols); return PushResult { .status = PushResult::TOO_MUCH_DATA, @@ -900,7 +908,14 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro } } - auto result = squashing.add({}); + auto planned_chunk = plan_squashing.flush(); + Chunk result_chunk; + if (planned_chunk.hasChunkInfo()) + result_chunk = apply_squashing.add(std::move(planned_chunk)); + ColumnsWithTypeAndName cols; + for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) + cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.input_header.getDataTypes()[j], state.input_header.getNames()[j])); + auto result = Block(cols); return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context); } diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 8052ee8f630..f7a4651f6fd 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -28,6 +28,7 @@ #include #include #include +#include namespace ProfileEvents @@ -1266,7 +1267,8 @@ private: ProjectionNameToItsBlocks projection_parts; std::move_iterator projection_parts_iterator; - std::vector projection_squashes; + std::vector projection_squash_plannings; + std::vector projection_squashes; const ProjectionsDescription & projections; ExecutableTaskPtr merge_projection_parts_task_ptr; @@ -1285,7 +1287,8 @@ void PartMergerWriter::prepare() for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i) { // We split the materialization into multiple stages similar to the process of INSERT SELECT query. - projection_squashes.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); + projection_squash_plannings.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); + projection_squashes.emplace_back(ctx->updated_header); } existing_rows_count = 0; @@ -1313,7 +1316,15 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() Block projection_block; { ProfileEventTimeIncrement watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds); - projection_block = projection_squashes[i].add(projection.calculate(cur_block, ctx->context)); + Block to_plan = projection.calculate(cur_block, ctx->context); + Chunk planned_chunk = projection_squash_plannings[i].add({to_plan.getColumns(), to_plan.rows()}); + Chunk projection_chunk; + if (planned_chunk.hasChunkInfo()) + projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); + ColumnsWithTypeAndName cols; + for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) + cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j])); + projection_block = Block(cols); } if (projection_block) @@ -1337,8 +1348,15 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i) { const auto & projection = *ctx->projections_to_build[i]; - auto & projection_squash = projection_squashes[i]; - auto projection_block = projection_squash.add({}); + auto & projection_squash_plan = projection_squash_plannings[i]; + auto planned_chunk = projection_squash_plan.flush(); + Chunk projection_chunk; + if (planned_chunk.hasChunkInfo()) + projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); + ColumnsWithTypeAndName cols; + for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) + cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j])); + auto projection_block = Block(cols); if (projection_block) { auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(