From 7fe4e675707c1e27edf2a06f3779768a483e6c21 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Tue, 21 May 2024 19:02:50 +0200 Subject: [PATCH] accept test 02457_insert_select_progress_http --- src/Interpreters/InterpreterInsertQuery.cpp | 56 +++++++++---------- src/Interpreters/SquashingTransform.cpp | 37 ++++++------ src/Interpreters/SquashingTransform.h | 7 +-- .../Transforms/SquashingChunksTransform.cpp | 13 +++-- .../MergeTree/ReplicatedMergeTreeSink.cpp | 7 --- 5 files changed, 54 insertions(+), 66 deletions(-) diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 0f3df3752cb..339f68258dc 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -545,6 +545,34 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery & } } + auto actions_dag = ActionsDAG::makeConvertingActions( + pipeline.getHeader().getColumnsWithTypeAndName(), + query_sample_block.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Position); + auto actions = std::make_shared(actions_dag, ExpressionActionsSettings::fromContext(getContext(), CompileExpressions::yes)); + + pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr + { + return std::make_shared(in_header, actions); + }); + + /// We need to convert Sparse columns to full, because it's destination storage + /// may not support it or may have different settings for applying Sparse serialization. + pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr + { + return std::make_shared(in_header); + }); + + pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr + { + auto context_ptr = getContext(); + auto counting = std::make_shared(in_header, nullptr, context_ptr->getQuota()); + counting->setProcessListElement(context_ptr->getProcessListElement()); + counting->setProgressCallback(context_ptr->getProgressCallback()); + + return counting; + }); + pipeline.resize(1); if (shouldAddSquashingFroStorage(table)) @@ -595,34 +623,6 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery & pipeline.resize(presink_chains.size()); - auto actions_dag = ActionsDAG::makeConvertingActions( - pipeline.getHeader().getColumnsWithTypeAndName(), - query_sample_block.getColumnsWithTypeAndName(), - ActionsDAG::MatchColumnsMode::Position); - auto actions = std::make_shared(actions_dag, ExpressionActionsSettings::fromContext(getContext(), CompileExpressions::yes)); - - pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr - { - return std::make_shared(in_header, actions); - }); - - /// We need to convert Sparse columns to full, because it's destination storage - /// may not support it or may have different settings for applying Sparse serialization. - pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr - { - return std::make_shared(in_header); - }); - - pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr - { - auto context_ptr = getContext(); - auto counting = std::make_shared(in_header, nullptr, context_ptr->getQuota()); - counting->setProcessListElement(context_ptr->getProcessListElement()); - counting->setProgressCallback(context_ptr->getProgressCallback()); - - return counting; - }); - for (auto & chain : presink_chains) pipeline.addResources(chain.detachResources()); pipeline.addChains(std::move(presink_chains)); diff --git a/src/Interpreters/SquashingTransform.cpp b/src/Interpreters/SquashingTransform.cpp index cf4f2060414..8a902add9a5 100644 --- a/src/Interpreters/SquashingTransform.cpp +++ b/src/Interpreters/SquashingTransform.cpp @@ -1,5 +1,6 @@ #include +#include namespace DB { @@ -16,23 +17,6 @@ SquashingTransform::SquashingTransform(size_t min_block_size_rows_, size_t min_b } SquashingTransform::SquashResult SquashingTransform::add(Block && input_block) -{ - return addImpl(std::move(input_block)); -} - -SquashingTransform::SquashResult SquashingTransform::add(const Block & input_block) -{ - return addImpl(input_block); -} - -/* - * To minimize copying, accept two types of argument: const reference for output - * stream, and rvalue reference for input stream, and decide whether to copy - * inside this function. This allows us not to copy Block unless we absolutely - * have to. - */ -template -SquashingTransform::SquashResult SquashingTransform::addImpl(ReferenceType input_block) { /// End of input stream. if (!input_block) @@ -66,7 +50,7 @@ SquashingTransform::SquashResult SquashingTransform::addImpl(ReferenceType input return SquashResult{std::move(to_return), true}; } - append(std::move(input_block)); + append(std::move(input_block)); if (isEnoughSize(accumulated_block)) { Block to_return; @@ -79,8 +63,7 @@ SquashingTransform::SquashResult SquashingTransform::addImpl(ReferenceType input } -template -void SquashingTransform::append(ReferenceType input_block) +void SquashingTransform::append(Block && input_block) { if (!accumulated_block) { @@ -88,6 +71,11 @@ void SquashingTransform::append(ReferenceType input_block) return; } + LOG_DEBUG(getLogger("SquashingTransform"), + "input_block rows {}, size {}, columns {}, accumulated_block rows {}, size {}, columns {}, ", + input_block.rows(), input_block.bytes(), input_block.columns(), + accumulated_block.rows(), accumulated_block.bytes(), accumulated_block.columns()); + assert(blocksHaveEqualStructure(input_block, accumulated_block)); try @@ -96,6 +84,15 @@ void SquashingTransform::append(ReferenceType input_block) { const auto source_column = input_block.getByPosition(i).column; + const auto acc_column = accumulated_block.getByPosition(i).column; + + LOG_DEBUG(getLogger("SquashingTransform"), + "column {} {}, acc rows {}, size {}, allocated {}, input rows {} size {} allocated {}", + i, source_column->getName(), + acc_column->size(), acc_column->byteSize(), acc_column->allocatedBytes(), + source_column->size(), source_column->byteSize(), source_column->allocatedBytes()); + + auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column)); mutable_column->insertRangeFrom(*source_column, 0, source_column->size()); accumulated_block.getByPosition(i).column = std::move(mutable_column); diff --git a/src/Interpreters/SquashingTransform.h b/src/Interpreters/SquashingTransform.h index f1eba537338..fff55a760db 100644 --- a/src/Interpreters/SquashingTransform.h +++ b/src/Interpreters/SquashingTransform.h @@ -34,7 +34,6 @@ public: * At end, you need to pass empty block. As the result for last (empty) block, you will get last Result with ready = true. */ SquashResult add(Block && block); - SquashResult add(const Block & block); private: size_t min_block_size_rows; @@ -42,11 +41,7 @@ private: Block accumulated_block; - template - SquashResult addImpl(ReferenceType block); - - template - void append(ReferenceType block); + void append(Block && block); bool isEnoughSize(const Block & block); bool isEnoughSize(size_t rows, size_t bytes) const; diff --git a/src/Processors/Transforms/SquashingChunksTransform.cpp b/src/Processors/Transforms/SquashingChunksTransform.cpp index 1a29b8d8a2d..ea0d63a2ed7 100644 --- a/src/Processors/Transforms/SquashingChunksTransform.cpp +++ b/src/Processors/Transforms/SquashingChunksTransform.cpp @@ -21,7 +21,7 @@ void SquashingChunksTransform::onConsume(Chunk chunk) "onConsume {}", chunk.getNumRows()); if (cur_chunkinfos.empty()) - cur_chunkinfos = chunk.getChunkInfos(); + cur_chunkinfos = chunk.getChunkInfos().clone(); auto result = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())); if (result.block) @@ -33,7 +33,7 @@ void SquashingChunksTransform::onConsume(Chunk chunk) if (cur_chunkinfos.empty() && result.input_block_delayed) { - cur_chunkinfos = chunk.getChunkInfos(); + cur_chunkinfos = chunk.getChunkInfos().clone(); } } @@ -79,12 +79,15 @@ SimpleSquashingChunksTransform::SimpleSquashingChunksTransform( void SimpleSquashingChunksTransform::transform(Chunk & chunk) { LOG_DEBUG(getLogger("SimpleSquashingChunksTransform"), - "transform {}, finished {}", chunk.getNumRows(), finished); + "transform rows {}, size {}, columns {}, infos: {}/{}, finished {}", + chunk.getNumRows(), chunk.bytes(), chunk.getNumColumns(), + chunk.getChunkInfos().size(), chunk.getChunkInfos().debug(), + finished); if (!finished) { if (cur_chunkinfos.empty()) - cur_chunkinfos = chunk.getChunkInfos(); + cur_chunkinfos = chunk.getChunkInfos().clone(); auto result = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())); if (result.block) @@ -96,7 +99,7 @@ void SimpleSquashingChunksTransform::transform(Chunk & chunk) if (cur_chunkinfos.empty() && result.input_block_delayed) { - cur_chunkinfos = chunk.getChunkInfos(); + cur_chunkinfos = chunk.getChunkInfos().clone(); } } else diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 41fdb86f3bd..11c64c97cb7 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -442,13 +442,6 @@ void ReplicatedMergeTreeSinkImpl::consume(Chunk & chunk) delayed_chunk = std::make_unique(); delayed_chunk->partitions = std::move(partitions); - /// If deduplicated data should not be inserted into MV, we need to set proper - /// value for `last_block_is_duplicate`, which is possible only after the part is committed. - /// Othervide we can delay commit. - /// TODO: we can also delay commit if there is no MVs. - // if (!settings.deduplicate_blocks_in_dependent_materialized_views) - // finishDelayedChunk(zookeeper); - ++num_blocks_processed; }