From 78e161ff15b5399aa18141b5cf896353a2fc9e00 Mon Sep 17 00:00:00 2001 From: yariks5s Date: Mon, 27 May 2024 19:02:17 +0000 Subject: [PATCH 01/15] fixes (added header to planner) --- src/Interpreters/Squashing.cpp | 7 ++- src/Interpreters/Squashing.h | 4 +- .../Transforms/PlanSquashingTransform.cpp | 2 +- .../Transforms/SquashingTransform.cpp | 4 +- src/Server/TCPHandler.cpp | 6 +-- src/Storages/MergeTree/MutateTask.cpp | 44 +++++++++---------- 6 files changed, 31 insertions(+), 36 deletions(-) diff --git a/src/Interpreters/Squashing.cpp b/src/Interpreters/Squashing.cpp index 82d80114a85..9ecd92f732c 100644 --- a/src/Interpreters/Squashing.cpp +++ b/src/Interpreters/Squashing.cpp @@ -1,7 +1,6 @@ #include #include #include -#include "Columns/IColumn.h" namespace DB @@ -69,9 +68,10 @@ const ChunksToSquash* ApplySquashing::getInfoFromChunk(const Chunk & chunk) return agg_info; } -PlanSquashing::PlanSquashing(size_t min_block_size_rows_, size_t min_block_size_bytes_) +PlanSquashing::PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_) : min_block_size_rows(min_block_size_rows_) , min_block_size_bytes(min_block_size_bytes_) + , header(std::move(header_)) { } @@ -141,8 +141,7 @@ Chunk PlanSquashing::convertToChunk(std::vector && chunks) chunks.clear(); - Columns cols = {}; - return Chunk(cols, 0, info); + return Chunk(header.cloneEmptyColumns(), 0, info); } void PlanSquashing::expandCurrentSize(size_t rows, size_t bytes) diff --git a/src/Interpreters/Squashing.h b/src/Interpreters/Squashing.h index d9d430c1835..a2928e0eeb6 100644 --- a/src/Interpreters/Squashing.h +++ b/src/Interpreters/Squashing.h @@ -48,7 +48,7 @@ private: class PlanSquashing { public: - PlanSquashing(size_t min_block_size_rows_, size_t min_block_size_bytes_); + explicit PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_); Chunk add(Chunk && input_chunk); Chunk flush(); @@ -68,7 +68,7 @@ private: size_t min_block_size_rows; size_t min_block_size_bytes; - // const Block header; + const Block header; CurrentSize accumulated_size; void expandCurrentSize(size_t rows, size_t bytes); diff --git a/src/Processors/Transforms/PlanSquashingTransform.cpp b/src/Processors/Transforms/PlanSquashingTransform.cpp index 96f41e37d2f..7945bd97e04 100644 --- a/src/Processors/Transforms/PlanSquashingTransform.cpp +++ b/src/Processors/Transforms/PlanSquashingTransform.cpp @@ -11,7 +11,7 @@ namespace ErrorCodes } PlanSquashingTransform::PlanSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t num_ports) - : IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), balance(min_block_size_rows, min_block_size_bytes) + : IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), balance(header, min_block_size_rows, min_block_size_bytes) { } diff --git a/src/Processors/Transforms/SquashingTransform.cpp b/src/Processors/Transforms/SquashingTransform.cpp index 6f7c877b2f3..a516811bf45 100644 --- a/src/Processors/Transforms/SquashingTransform.cpp +++ b/src/Processors/Transforms/SquashingTransform.cpp @@ -12,7 +12,7 @@ extern const int LOGICAL_ERROR; SquashingTransform::SquashingTransform( const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes) : ExceptionKeepingTransform(header, header, false) - , planSquashing(min_block_size_rows, min_block_size_bytes) + , planSquashing(header, min_block_size_rows, min_block_size_bytes) , applySquashing(header) { } @@ -60,7 +60,7 @@ void SquashingTransform::work() SimpleSquashingTransform::SimpleSquashingTransform( const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes) : ISimpleTransform(header, header, false) - , planSquashing(min_block_size_rows, min_block_size_bytes) + , planSquashing(header, min_block_size_rows, min_block_size_bytes) , applySquashing(header) { } diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 476c4dd372b..dfe2d909b43 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -885,17 +885,15 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro using PushResult = AsynchronousInsertQueue::PushResult; startInsertQuery(); - PlanSquashing plan_squashing(0, query_context->getSettingsRef().async_insert_max_data_size); + PlanSquashing plan_squashing(state.input_header, 0, query_context->getSettingsRef().async_insert_max_data_size); ApplySquashing apply_squashing(state.input_header); while (readDataNext()) { auto planned_chunk = plan_squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()}); - Chunk result_chunk; if (planned_chunk.hasChunkInfo()) - result_chunk = apply_squashing.add(std::move(planned_chunk)); - if (result_chunk) { + Chunk result_chunk = apply_squashing.add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.input_header.getDataTypes()[j], state.input_header.getNames()[j])); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index f7a4651f6fd..5e14d4c5b38 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1267,7 +1267,7 @@ private: ProjectionNameToItsBlocks projection_parts; std::move_iterator projection_parts_iterator; - std::vector projection_squash_plannings; + std::vector projection_squash_plannings; std::vector projection_squashes; const ProjectionsDescription & projections; @@ -1282,12 +1282,15 @@ private: void PartMergerWriter::prepare() { + projection_squash_plannings.reserve(ctx->projections_to_build.size()); + projection_squashes.reserve(ctx->projections_to_build.size()); const auto & settings = ctx->context->getSettingsRef(); for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i) { + PlanSquashing plan_squashing(ctx->updated_header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); // We split the materialization into multiple stages similar to the process of INSERT SELECT query. - projection_squash_plannings.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); + projection_squash_plannings.push_back(&plan_squashing); projection_squashes.emplace_back(ctx->updated_header); } @@ -1313,24 +1316,21 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { const auto & projection = *ctx->projections_to_build[i]; - Block projection_block; + Chunk planned_chunk; { ProfileEventTimeIncrement watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds); - Block to_plan = projection.calculate(cur_block, ctx->context); - Chunk planned_chunk = projection_squash_plannings[i].add({to_plan.getColumns(), to_plan.rows()}); - Chunk projection_chunk; - if (planned_chunk.hasChunkInfo()) - projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); + Block block_to_squash = projection.calculate(cur_block, ctx->context); + planned_chunk = projection_squash_plannings[i]->add({block_to_squash.getColumns(), block_to_squash.rows()}); + } + + if (planned_chunk.hasChunkInfo()) + { + Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j])); - projection_block = Block(cols); - } - - if (projection_block) - { auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( - *ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num); + *ctx->data, ctx->log, Block(cols), projection, ctx->new_data_part.get(), ++block_num); tmp_part.finalize(); tmp_part.part->getDataPartStorage().commitTransaction(); projection_parts[projection.name].emplace_back(std::move(tmp_part.part)); @@ -1349,18 +1349,16 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { const auto & projection = *ctx->projections_to_build[i]; auto & projection_squash_plan = projection_squash_plannings[i]; - auto planned_chunk = projection_squash_plan.flush(); - Chunk projection_chunk; + auto planned_chunk = projection_squash_plan->flush(); if (planned_chunk.hasChunkInfo()) - projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); - ColumnsWithTypeAndName cols; - for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) - cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j])); - auto projection_block = Block(cols); - if (projection_block) { + Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); + ColumnsWithTypeAndName cols; + for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) + cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j])); + auto temp_part = MergeTreeDataWriter::writeTempProjectionPart( - *ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num); + *ctx->data, ctx->log, Block(cols), projection, ctx->new_data_part.get(), ++block_num); temp_part.finalize(); temp_part.part->getDataPartStorage().commitTransaction(); projection_parts[projection.name].emplace_back(std::move(temp_part.part)); From 01183902a667d94d71ed9faabeffdc60cdcf95cd Mon Sep 17 00:00:00 2001 From: yariks5s Date: Tue, 28 May 2024 12:07:20 +0000 Subject: [PATCH 02/15] try to fix a segfault --- src/Server/TCPHandler.cpp | 10 ++++++---- src/Storages/MergeTree/MutateTask.cpp | 12 ++++++------ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index dfe2d909b43..d0e9dc5f3ee 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -895,8 +895,9 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro { Chunk result_chunk = apply_squashing.add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) - cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.input_header.getDataTypes()[j], state.input_header.getNames()[j])); + if (result_chunk.hasColumns()) + for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) + cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.input_header.getDataTypes()[j], state.input_header.getNames()[j])); auto result = Block(cols); return PushResult { @@ -911,8 +912,9 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro if (planned_chunk.hasChunkInfo()) result_chunk = apply_squashing.add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) - cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.input_header.getDataTypes()[j], state.input_header.getNames()[j])); + if (result_chunk.hasColumns()) + for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) + cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.input_header.getDataTypes()[j], state.input_header.getNames()[j])); auto result = Block(cols); return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context); } diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 5e14d4c5b38..0e272fc8eb9 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1282,8 +1282,6 @@ private: void PartMergerWriter::prepare() { - projection_squash_plannings.reserve(ctx->projections_to_build.size()); - projection_squashes.reserve(ctx->projections_to_build.size()); const auto & settings = ctx->context->getSettingsRef(); for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i) @@ -1327,8 +1325,9 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) - cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j])); + if (projection_chunk.hasColumns()) + for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) + cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j])); auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( *ctx->data, ctx->log, Block(cols), projection, ctx->new_data_part.get(), ++block_num); tmp_part.finalize(); @@ -1354,8 +1353,9 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) - cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j])); + if (projection_chunk.hasColumns()) + for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) + cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j])); auto temp_part = MergeTreeDataWriter::writeTempProjectionPart( *ctx->data, ctx->log, Block(cols), projection, ctx->new_data_part.get(), ++block_num); From 45f6c19c9df5c3f62b1ed4933321053ef6f77c91 Mon Sep 17 00:00:00 2001 From: yariks5s Date: Tue, 28 May 2024 15:36:19 +0000 Subject: [PATCH 03/15] attempt #2 --- src/Storages/MergeTree/MutateTask.cpp | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 0e272fc8eb9..8c4e0c6e654 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1315,11 +1315,9 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() const auto & projection = *ctx->projections_to_build[i]; Chunk planned_chunk; - { - ProfileEventTimeIncrement watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds); - Block block_to_squash = projection.calculate(cur_block, ctx->context); - planned_chunk = projection_squash_plannings[i]->add({block_to_squash.getColumns(), block_to_squash.rows()}); - } + ProfileEventTimeIncrement watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds); + Block block_to_squash = projection.calculate(cur_block, ctx->context); + planned_chunk = projection_squash_plannings[i]->add({block_to_squash.getColumns(), block_to_squash.rows()}); if (planned_chunk.hasChunkInfo()) { @@ -1327,7 +1325,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() ColumnsWithTypeAndName cols; if (projection_chunk.hasColumns()) for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) - cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j])); + cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], block_to_squash.getDataTypes()[j], block_to_squash.getNames()[j])); auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( *ctx->data, ctx->log, Block(cols), projection, ctx->new_data_part.get(), ++block_num); tmp_part.finalize(); From f46a7d64a0163e0cf9140eb0e56c88f2cc6471bb Mon Sep 17 00:00:00 2001 From: yariks5s Date: Tue, 28 May 2024 17:00:35 +0000 Subject: [PATCH 04/15] fix segfault in TCPHandler --- src/Server/TCPHandler.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index d0e9dc5f3ee..b95face57e1 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -897,7 +897,7 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro ColumnsWithTypeAndName cols; if (result_chunk.hasColumns()) for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) - cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.input_header.getDataTypes()[j], state.input_header.getNames()[j])); + cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.block_for_insert.getDataTypes()[j], state.block_for_insert.getNames()[j])); auto result = Block(cols); return PushResult { @@ -914,7 +914,7 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro ColumnsWithTypeAndName cols; if (result_chunk.hasColumns()) for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) - cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.input_header.getDataTypes()[j], state.input_header.getNames()[j])); + cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.block_for_insert.getDataTypes()[j], state.block_for_insert.getNames()[j])); auto result = Block(cols); return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context); } From 513de6ce19867dc10fedf5c9820363b84655a9f1 Mon Sep 17 00:00:00 2001 From: yariks5s Date: Tue, 28 May 2024 17:59:44 +0000 Subject: [PATCH 05/15] using of header from applySquashing --- src/Interpreters/Squashing.h | 2 +- src/Storages/MergeTree/MutateTask.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Interpreters/Squashing.h b/src/Interpreters/Squashing.h index a2928e0eeb6..05259bbc0c3 100644 --- a/src/Interpreters/Squashing.h +++ b/src/Interpreters/Squashing.h @@ -32,10 +32,10 @@ public: explicit ApplySquashing(Block header_); Chunk add(Chunk && input_chunk); + const Block header; private: Chunk accumulated_chunk; - const Block header; const ChunksToSquash * getInfoFromChunk(const Chunk & chunk); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 8c4e0c6e654..0a3a217d943 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1353,7 +1353,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() ColumnsWithTypeAndName cols; if (projection_chunk.hasColumns()) for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) - cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], ctx->updated_header.getDataTypes()[j], ctx->updated_header.getNames()[j])); + cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], projection_squashes[i].header.getDataTypes()[j], projection_squashes[i].header.getNames()[j])); auto temp_part = MergeTreeDataWriter::writeTempProjectionPart( *ctx->data, ctx->log, Block(cols), projection, ctx->new_data_part.get(), ++block_num); From f0e9d6b459cfee4331861d4f0e3c92c1e9d67c72 Mon Sep 17 00:00:00 2001 From: yariks5s Date: Wed, 29 May 2024 14:30:48 +0000 Subject: [PATCH 06/15] revert changes in mv --- .../Transforms/buildPushingToViewsChain.cpp | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index ff1be9323f5..177d45650dd 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -367,13 +367,16 @@ std::optional generateViewChain( bool check_access = !materialized_view->hasInnerTable() && materialized_view->getInMemoryMetadataPtr()->sql_security_type; out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access); - bool table_prefers_large_blocks = inner_table->prefersLargeBlocks(); - const auto & settings = insert_context->getSettingsRef(); + if (interpreter.shouldAddSquashingFroStorage(inner_table)) + { + bool table_prefers_large_blocks = inner_table->prefersLargeBlocks(); + const auto & settings = insert_context->getSettingsRef(); - out.addSource(std::make_shared( - out.getInputHeader(), - table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, - table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL)); + out.addSource(std::make_shared( + out.getInputHeader(), + table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, + table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL)); + } auto counting = std::make_shared(out.getInputHeader(), current_thread, insert_context->getQuota()); counting->setProcessListElement(insert_context->getProcessListElement()); From f51a145437df6f173d67e5fc7f1259c1e0154a98 Mon Sep 17 00:00:00 2001 From: yariks5s Date: Wed, 29 May 2024 15:00:22 +0000 Subject: [PATCH 07/15] fixes for segfault --- src/Server/TCPHandler.cpp | 8 ++++---- src/Storages/MergeTree/MutateTask.cpp | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index b95face57e1..af184940c7e 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -895,9 +895,9 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro { Chunk result_chunk = apply_squashing.add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - if (result_chunk.hasColumns()) + if (result_chunk.hasColumns() && apply_squashing.header) for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) - cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.block_for_insert.getDataTypes()[j], state.block_for_insert.getNames()[j])); + cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], apply_squashing.header.getDataTypes()[j], apply_squashing.header.getNames()[j])); auto result = Block(cols); return PushResult { @@ -912,9 +912,9 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro if (planned_chunk.hasChunkInfo()) result_chunk = apply_squashing.add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - if (result_chunk.hasColumns()) + if (result_chunk.hasColumns() && apply_squashing.header) for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) - cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.block_for_insert.getDataTypes()[j], state.block_for_insert.getNames()[j])); + cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], apply_squashing.header.getDataTypes()[j], apply_squashing.header.getNames()[j])); auto result = Block(cols); return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context); } diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 0a3a217d943..3469b609f6b 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1323,9 +1323,9 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - if (projection_chunk.hasColumns()) + if (projection_chunk.hasColumns() && projection_squashes[i].header) for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) - cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], block_to_squash.getDataTypes()[j], block_to_squash.getNames()[j])); + cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], projection_squashes[i].header.getDataTypes()[j], projection_squashes[i].header.getNames()[j])); auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( *ctx->data, ctx->log, Block(cols), projection, ctx->new_data_part.get(), ++block_num); tmp_part.finalize(); @@ -1351,7 +1351,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - if (projection_chunk.hasColumns()) + if (projection_chunk.hasColumns() && projection_squashes[i].header) for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], projection_squashes[i].header.getDataTypes()[j], projection_squashes[i].header.getNames()[j])); From d351c05243cc42dd05b3a4edf90dfe2044786e9a Mon Sep 17 00:00:00 2001 From: yariks5s Date: Wed, 29 May 2024 16:13:21 +0000 Subject: [PATCH 08/15] reset + try to fix mv and mutations --- .../Transforms/buildPushingToViewsChain.cpp | 15 +++++++++------ src/Server/TCPHandler.cpp | 6 +++--- src/Storages/MergeTree/MutateTask.cpp | 4 ++-- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index ff1be9323f5..177d45650dd 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -367,13 +367,16 @@ std::optional generateViewChain( bool check_access = !materialized_view->hasInnerTable() && materialized_view->getInMemoryMetadataPtr()->sql_security_type; out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access); - bool table_prefers_large_blocks = inner_table->prefersLargeBlocks(); - const auto & settings = insert_context->getSettingsRef(); + if (interpreter.shouldAddSquashingFroStorage(inner_table)) + { + bool table_prefers_large_blocks = inner_table->prefersLargeBlocks(); + const auto & settings = insert_context->getSettingsRef(); - out.addSource(std::make_shared( - out.getInputHeader(), - table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, - table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL)); + out.addSource(std::make_shared( + out.getInputHeader(), + table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, + table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL)); + } auto counting = std::make_shared(out.getInputHeader(), current_thread, insert_context->getQuota()); counting->setProcessListElement(insert_context->getProcessListElement()); diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index b95face57e1..3cbaffe857a 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -895,7 +895,7 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro { Chunk result_chunk = apply_squashing.add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - if (result_chunk.hasColumns()) + if (result_chunk.hasColumns() && state.block_for_insert) for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.block_for_insert.getDataTypes()[j], state.block_for_insert.getNames()[j])); auto result = Block(cols); @@ -912,9 +912,9 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro if (planned_chunk.hasChunkInfo()) result_chunk = apply_squashing.add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - if (result_chunk.hasColumns()) + if (result_chunk.hasColumns() && apply_squashing.header) for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) - cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], state.block_for_insert.getDataTypes()[j], state.block_for_insert.getNames()[j])); + cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], apply_squashing.header.getDataTypes()[j], apply_squashing.header.getNames()[j])); auto result = Block(cols); return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context); } diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 0a3a217d943..af36b7bb3e8 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1323,7 +1323,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - if (projection_chunk.hasColumns()) + if (projection_chunk.hasColumns() && block_to_squash) for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], block_to_squash.getDataTypes()[j], block_to_squash.getNames()[j])); auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( @@ -1351,7 +1351,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - if (projection_chunk.hasColumns()) + if (projection_chunk.hasColumns() && projection_squashes[i].header) for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], projection_squashes[i].header.getDataTypes()[j], projection_squashes[i].header.getNames()[j])); From d86580ef049fc402d48808e3c125a61f824ed40f Mon Sep 17 00:00:00 2001 From: yariks5s Date: Wed, 29 May 2024 20:37:49 +0000 Subject: [PATCH 09/15] try to fix segfaults --- src/Interpreters/Squashing.h | 11 ++++++++++- src/Server/TCPHandler.cpp | 4 ++-- src/Storages/MergeTree/MutateTask.cpp | 5 +++-- 3 files changed, 15 insertions(+), 5 deletions(-) mode change 100644 => 100755 src/Interpreters/Squashing.h mode change 100644 => 100755 src/Server/TCPHandler.cpp mode change 100644 => 100755 src/Storages/MergeTree/MutateTask.cpp diff --git a/src/Interpreters/Squashing.h b/src/Interpreters/Squashing.h old mode 100644 new mode 100755 index 05259bbc0c3..84e67e5d4c1 --- a/src/Interpreters/Squashing.h +++ b/src/Interpreters/Squashing.h @@ -32,10 +32,19 @@ public: explicit ApplySquashing(Block header_); Chunk add(Chunk && input_chunk); - const Block header; + + void setHeader(Block header_) + { + header = header_; + } + Block getHeader() + { + return header; + } private: Chunk accumulated_chunk; + Block header; const ChunksToSquash * getInfoFromChunk(const Chunk & chunk); diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp old mode 100644 new mode 100755 index 3cbaffe857a..77f84dba6e4 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -912,9 +912,9 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro if (planned_chunk.hasChunkInfo()) result_chunk = apply_squashing.add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - if (result_chunk.hasColumns() && apply_squashing.header) + if (result_chunk.hasColumns() && apply_squashing.getHeader()) for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) - cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], apply_squashing.header.getDataTypes()[j], apply_squashing.header.getNames()[j])); + cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], apply_squashing.getHeader().getDataTypes()[j], apply_squashing.getHeader().getNames()[j])); auto result = Block(cols); return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context); } diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp old mode 100644 new mode 100755 index af36b7bb3e8..ff1d7c0b7c2 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1331,6 +1331,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() tmp_part.finalize(); tmp_part.part->getDataPartStorage().commitTransaction(); projection_parts[projection.name].emplace_back(std::move(tmp_part.part)); + projection_squashes[i].setHeader(block_to_squash); } } @@ -1351,9 +1352,9 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - if (projection_chunk.hasColumns() && projection_squashes[i].header) + if (projection_chunk.hasColumns() && projection_squashes[i].getHeader()) for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) - cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], projection_squashes[i].header.getDataTypes()[j], projection_squashes[i].header.getNames()[j])); + cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], projection_squashes[i].getHeader().getDataTypes()[j], projection_squashes[i].getHeader().getNames()[j])); auto temp_part = MergeTreeDataWriter::writeTempProjectionPart( *ctx->data, ctx->log, Block(cols), projection, ctx->new_data_part.get(), ++block_num); From aa52e9036ef1aef21c037bb2d8f3722f4cd24de3 Mon Sep 17 00:00:00 2001 From: yariks5s Date: Wed, 29 May 2024 23:29:19 +0000 Subject: [PATCH 10/15] reset last commit --- src/Interpreters/Squashing.h | 11 ++++++++++- src/Server/TCPHandler.cpp | 4 ++-- src/Storages/MergeTree/MutateTask.cpp | 5 +++-- 3 files changed, 15 insertions(+), 5 deletions(-) mode change 100644 => 100755 src/Interpreters/Squashing.h mode change 100644 => 100755 src/Server/TCPHandler.cpp mode change 100644 => 100755 src/Storages/MergeTree/MutateTask.cpp diff --git a/src/Interpreters/Squashing.h b/src/Interpreters/Squashing.h old mode 100644 new mode 100755 index 05259bbc0c3..84e67e5d4c1 --- a/src/Interpreters/Squashing.h +++ b/src/Interpreters/Squashing.h @@ -32,10 +32,19 @@ public: explicit ApplySquashing(Block header_); Chunk add(Chunk && input_chunk); - const Block header; + + void setHeader(Block header_) + { + header = header_; + } + Block getHeader() + { + return header; + } private: Chunk accumulated_chunk; + Block header; const ChunksToSquash * getInfoFromChunk(const Chunk & chunk); diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp old mode 100644 new mode 100755 index 3cbaffe857a..77f84dba6e4 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -912,9 +912,9 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro if (planned_chunk.hasChunkInfo()) result_chunk = apply_squashing.add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - if (result_chunk.hasColumns() && apply_squashing.header) + if (result_chunk.hasColumns() && apply_squashing.getHeader()) for (size_t j = 0; j < result_chunk.getNumColumns(); ++ j) - cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], apply_squashing.header.getDataTypes()[j], apply_squashing.header.getNames()[j])); + cols.push_back(ColumnWithTypeAndName(result_chunk.getColumns()[j], apply_squashing.getHeader().getDataTypes()[j], apply_squashing.getHeader().getNames()[j])); auto result = Block(cols); return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context); } diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp old mode 100644 new mode 100755 index af36b7bb3e8..ff1d7c0b7c2 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1331,6 +1331,7 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() tmp_part.finalize(); tmp_part.part->getDataPartStorage().commitTransaction(); projection_parts[projection.name].emplace_back(std::move(tmp_part.part)); + projection_squashes[i].setHeader(block_to_squash); } } @@ -1351,9 +1352,9 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { Chunk projection_chunk = projection_squashes[i].add(std::move(planned_chunk)); ColumnsWithTypeAndName cols; - if (projection_chunk.hasColumns() && projection_squashes[i].header) + if (projection_chunk.hasColumns() && projection_squashes[i].getHeader()) for (size_t j = 0; j < projection_chunk.getNumColumns(); ++j) - cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], projection_squashes[i].header.getDataTypes()[j], projection_squashes[i].header.getNames()[j])); + cols.push_back(ColumnWithTypeAndName(projection_chunk.getColumns()[j], projection_squashes[i].getHeader().getDataTypes()[j], projection_squashes[i].getHeader().getNames()[j])); auto temp_part = MergeTreeDataWriter::writeTempProjectionPart( *ctx->data, ctx->log, Block(cols), projection, ctx->new_data_part.get(), ++block_num); From b160548aafc07e0db47ece097943cf3e61422c4c Mon Sep 17 00:00:00 2001 From: yariks5s Date: Wed, 29 May 2024 23:39:16 +0000 Subject: [PATCH 11/15] change the chmod back --- src/Interpreters/Squashing.h | 0 src/Server/TCPHandler.cpp | 0 src/Storages/MergeTree/MutateTask.cpp | 0 3 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 src/Interpreters/Squashing.h mode change 100755 => 100644 src/Server/TCPHandler.cpp mode change 100755 => 100644 src/Storages/MergeTree/MutateTask.cpp diff --git a/src/Interpreters/Squashing.h b/src/Interpreters/Squashing.h old mode 100755 new mode 100644 diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp old mode 100755 new mode 100644 diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp old mode 100755 new mode 100644 From 3e0947c759f5b9a70add338681cfcb660388e2a8 Mon Sep 17 00:00:00 2001 From: yariks5s Date: Thu, 30 May 2024 00:16:35 +0000 Subject: [PATCH 12/15] try to remove if in mv --- .../Transforms/buildPushingToViewsChain.cpp | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index 177d45650dd..ff1be9323f5 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -367,16 +367,13 @@ std::optional generateViewChain( bool check_access = !materialized_view->hasInnerTable() && materialized_view->getInMemoryMetadataPtr()->sql_security_type; out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access); - if (interpreter.shouldAddSquashingFroStorage(inner_table)) - { - bool table_prefers_large_blocks = inner_table->prefersLargeBlocks(); - const auto & settings = insert_context->getSettingsRef(); + bool table_prefers_large_blocks = inner_table->prefersLargeBlocks(); + const auto & settings = insert_context->getSettingsRef(); - out.addSource(std::make_shared( - out.getInputHeader(), - table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, - table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL)); - } + out.addSource(std::make_shared( + out.getInputHeader(), + table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, + table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL)); auto counting = std::make_shared(out.getInputHeader(), current_thread, insert_context->getQuota()); counting->setProcessListElement(insert_context->getProcessListElement()); From cadf9d466664ceb693dbebb31a3f3df57af84c8b Mon Sep 17 00:00:00 2001 From: yariks5s Date: Thu, 30 May 2024 10:41:36 +0000 Subject: [PATCH 13/15] Revert "try to remove if in mv" This reverts commit 3e0947c759f5b9a70add338681cfcb660388e2a8. --- .../Transforms/buildPushingToViewsChain.cpp | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index ff1be9323f5..177d45650dd 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -367,13 +367,16 @@ std::optional generateViewChain( bool check_access = !materialized_view->hasInnerTable() && materialized_view->getInMemoryMetadataPtr()->sql_security_type; out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access); - bool table_prefers_large_blocks = inner_table->prefersLargeBlocks(); - const auto & settings = insert_context->getSettingsRef(); + if (interpreter.shouldAddSquashingFroStorage(inner_table)) + { + bool table_prefers_large_blocks = inner_table->prefersLargeBlocks(); + const auto & settings = insert_context->getSettingsRef(); - out.addSource(std::make_shared( - out.getInputHeader(), - table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, - table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL)); + out.addSource(std::make_shared( + out.getInputHeader(), + table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, + table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL)); + } auto counting = std::make_shared(out.getInputHeader(), current_thread, insert_context->getQuota()); counting->setProcessListElement(insert_context->getProcessListElement()); From 6c6bf069e211c17182d6b54d0afdaff48f932bfe Mon Sep 17 00:00:00 2001 From: yariks5s Date: Thu, 30 May 2024 12:07:18 +0000 Subject: [PATCH 14/15] remove moving of header --- src/Interpreters/Squashing.cpp | 4 ++-- src/Storages/MergeTree/MutateTask.cpp | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Interpreters/Squashing.cpp b/src/Interpreters/Squashing.cpp index 9ecd92f732c..47add495421 100644 --- a/src/Interpreters/Squashing.cpp +++ b/src/Interpreters/Squashing.cpp @@ -11,7 +11,7 @@ namespace ErrorCodes } ApplySquashing::ApplySquashing(Block header_) - : header(std::move(header_)) + : header(header_) { } @@ -71,7 +71,7 @@ const ChunksToSquash* ApplySquashing::getInfoFromChunk(const Chunk & chunk) PlanSquashing::PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_) : min_block_size_rows(min_block_size_rows_) , min_block_size_bytes(min_block_size_bytes_) - , header(std::move(header_)) + , header(header_) { } diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index ff1d7c0b7c2..2269b16b443 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1314,10 +1314,9 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { const auto & projection = *ctx->projections_to_build[i]; - Chunk planned_chunk; ProfileEventTimeIncrement watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds); Block block_to_squash = projection.calculate(cur_block, ctx->context); - planned_chunk = projection_squash_plannings[i]->add({block_to_squash.getColumns(), block_to_squash.rows()}); + Chunk planned_chunk = projection_squash_plannings[i]->add({block_to_squash.getColumns(), block_to_squash.rows()}); if (planned_chunk.hasChunkInfo()) { From 0579fc9436528221d88ffc02f23e42de7ad4dc81 Mon Sep 17 00:00:00 2001 From: yariks5s Date: Thu, 30 May 2024 14:48:14 +0000 Subject: [PATCH 15/15] remove moving from planSquashing --- src/Interpreters/Squashing.cpp | 2 +- src/Interpreters/Squashing.h | 2 +- src/Processors/Transforms/PlanSquashingTransform.cpp | 2 +- src/Processors/Transforms/SquashingTransform.cpp | 4 ++-- src/Server/TCPHandler.cpp | 3 ++- src/Storages/MergeTree/MutateTask.cpp | 5 +++-- 6 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/Interpreters/Squashing.cpp b/src/Interpreters/Squashing.cpp index 47add495421..6706399a3d2 100644 --- a/src/Interpreters/Squashing.cpp +++ b/src/Interpreters/Squashing.cpp @@ -80,7 +80,7 @@ Chunk PlanSquashing::flush() return convertToChunk(std::move(chunks_to_merge_vec)); } -Chunk PlanSquashing::add(Chunk && input_chunk) +Chunk PlanSquashing::add(Chunk & input_chunk) { if (!input_chunk) return {}; diff --git a/src/Interpreters/Squashing.h b/src/Interpreters/Squashing.h index 84e67e5d4c1..802e77847e9 100644 --- a/src/Interpreters/Squashing.h +++ b/src/Interpreters/Squashing.h @@ -59,7 +59,7 @@ class PlanSquashing public: explicit PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_); - Chunk add(Chunk && input_chunk); + Chunk add(Chunk & input_chunk); Chunk flush(); bool isDataLeft() { diff --git a/src/Processors/Transforms/PlanSquashingTransform.cpp b/src/Processors/Transforms/PlanSquashingTransform.cpp index 7945bd97e04..1384f760d48 100644 --- a/src/Processors/Transforms/PlanSquashingTransform.cpp +++ b/src/Processors/Transforms/PlanSquashingTransform.cpp @@ -134,7 +134,7 @@ IProcessor::Status PlanSquashingTransform::waitForDataIn() void PlanSquashingTransform::transform(Chunk & chunk_) { - Chunk res_chunk = balance.add(std::move(chunk_)); + Chunk res_chunk = balance.add(chunk_); std::swap(res_chunk, chunk_); } diff --git a/src/Processors/Transforms/SquashingTransform.cpp b/src/Processors/Transforms/SquashingTransform.cpp index a516811bf45..67358316d48 100644 --- a/src/Processors/Transforms/SquashingTransform.cpp +++ b/src/Processors/Transforms/SquashingTransform.cpp @@ -19,7 +19,7 @@ SquashingTransform::SquashingTransform( void SquashingTransform::onConsume(Chunk chunk) { - Chunk planned_chunk = planSquashing.add(std::move(chunk)); + Chunk planned_chunk = planSquashing.add(chunk); if (planned_chunk.hasChunkInfo()) cur_chunk = applySquashing.add(std::move(planned_chunk)); } @@ -69,7 +69,7 @@ void SimpleSquashingTransform::transform(Chunk & chunk) { if (!finished) { - Chunk planned_chunk = planSquashing.add(std::move(chunk)); + Chunk planned_chunk = planSquashing.add(chunk); if (planned_chunk.hasChunkInfo()) chunk = applySquashing.add(std::move(planned_chunk)); } diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 77f84dba6e4..6973808078c 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -890,7 +890,8 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro while (readDataNext()) { - auto planned_chunk = plan_squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()}); + Chunk input_chunk = {state.block_for_insert.getColumns(), state.block_for_insert.rows()}; + auto planned_chunk = plan_squashing.add(input_chunk); if (planned_chunk.hasChunkInfo()) { Chunk result_chunk = apply_squashing.add(std::move(planned_chunk)); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 2269b16b443..5267143bf65 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1314,9 +1314,10 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections() { const auto & projection = *ctx->projections_to_build[i]; - ProfileEventTimeIncrement watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds); + ProfileEventTimeIncrement watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds); // Not clear why is it needed heee Block block_to_squash = projection.calculate(cur_block, ctx->context); - Chunk planned_chunk = projection_squash_plannings[i]->add({block_to_squash.getColumns(), block_to_squash.rows()}); + Chunk input_chunk = {block_to_squash.getColumns(), block_to_squash.rows()}; + Chunk planned_chunk = projection_squash_plannings[i]->add(input_chunk); if (planned_chunk.hasChunkInfo()) {