accept test 02457_insert_select_progress_http

This commit is contained in:
Sema Checherinda 2024-05-21 19:02:50 +02:00
parent ae124bf0b3
commit 7fe4e67570
5 changed files with 54 additions and 66 deletions

View File

@ -545,6 +545,34 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery &
}
}
auto actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
query_sample_block.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(getContext(), CompileExpressions::yes));
pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
return std::make_shared<ExpressionTransform>(in_header, actions);
});
/// We need to convert Sparse columns to full, because it's destination storage
/// may not support it or may have different settings for applying Sparse serialization.
pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
return std::make_shared<MaterializingTransform>(in_header);
});
pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
auto context_ptr = getContext();
auto counting = std::make_shared<CountingTransform>(in_header, nullptr, context_ptr->getQuota());
counting->setProcessListElement(context_ptr->getProcessListElement());
counting->setProgressCallback(context_ptr->getProgressCallback());
return counting;
});
pipeline.resize(1);
if (shouldAddSquashingFroStorage(table))
@ -595,34 +623,6 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery &
pipeline.resize(presink_chains.size());
auto actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
query_sample_block.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(getContext(), CompileExpressions::yes));
pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
return std::make_shared<ExpressionTransform>(in_header, actions);
});
/// We need to convert Sparse columns to full, because it's destination storage
/// may not support it or may have different settings for applying Sparse serialization.
pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
return std::make_shared<MaterializingTransform>(in_header);
});
pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
auto context_ptr = getContext();
auto counting = std::make_shared<CountingTransform>(in_header, nullptr, context_ptr->getQuota());
counting->setProcessListElement(context_ptr->getProcessListElement());
counting->setProgressCallback(context_ptr->getProgressCallback());
return counting;
});
for (auto & chain : presink_chains)
pipeline.addResources(chain.detachResources());
pipeline.addChains(std::move(presink_chains));

View File

@ -1,5 +1,6 @@
#include <Interpreters/SquashingTransform.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -16,23 +17,6 @@ SquashingTransform::SquashingTransform(size_t min_block_size_rows_, size_t min_b
}
SquashingTransform::SquashResult SquashingTransform::add(Block && input_block)
{
return addImpl<Block &&>(std::move(input_block));
}
SquashingTransform::SquashResult SquashingTransform::add(const Block & input_block)
{
return addImpl<const Block &>(input_block);
}
/*
* To minimize copying, accept two types of argument: const reference for output
* stream, and rvalue reference for input stream, and decide whether to copy
* inside this function. This allows us not to copy Block unless we absolutely
* have to.
*/
template <typename ReferenceType>
SquashingTransform::SquashResult SquashingTransform::addImpl(ReferenceType input_block)
{
/// End of input stream.
if (!input_block)
@ -66,7 +50,7 @@ SquashingTransform::SquashResult SquashingTransform::addImpl(ReferenceType input
return SquashResult{std::move(to_return), true};
}
append<ReferenceType>(std::move(input_block));
append(std::move(input_block));
if (isEnoughSize(accumulated_block))
{
Block to_return;
@ -79,8 +63,7 @@ SquashingTransform::SquashResult SquashingTransform::addImpl(ReferenceType input
}
template <typename ReferenceType>
void SquashingTransform::append(ReferenceType input_block)
void SquashingTransform::append(Block && input_block)
{
if (!accumulated_block)
{
@ -88,6 +71,11 @@ void SquashingTransform::append(ReferenceType input_block)
return;
}
LOG_DEBUG(getLogger("SquashingTransform"),
"input_block rows {}, size {}, columns {}, accumulated_block rows {}, size {}, columns {}, ",
input_block.rows(), input_block.bytes(), input_block.columns(),
accumulated_block.rows(), accumulated_block.bytes(), accumulated_block.columns());
assert(blocksHaveEqualStructure(input_block, accumulated_block));
try
@ -96,6 +84,15 @@ void SquashingTransform::append(ReferenceType input_block)
{
const auto source_column = input_block.getByPosition(i).column;
const auto acc_column = accumulated_block.getByPosition(i).column;
LOG_DEBUG(getLogger("SquashingTransform"),
"column {} {}, acc rows {}, size {}, allocated {}, input rows {} size {} allocated {}",
i, source_column->getName(),
acc_column->size(), acc_column->byteSize(), acc_column->allocatedBytes(),
source_column->size(), source_column->byteSize(), source_column->allocatedBytes());
auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column));
mutable_column->insertRangeFrom(*source_column, 0, source_column->size());
accumulated_block.getByPosition(i).column = std::move(mutable_column);

View File

@ -34,7 +34,6 @@ public:
* At end, you need to pass empty block. As the result for last (empty) block, you will get last Result with ready = true.
*/
SquashResult add(Block && block);
SquashResult add(const Block & block);
private:
size_t min_block_size_rows;
@ -42,11 +41,7 @@ private:
Block accumulated_block;
template <typename ReferenceType>
SquashResult addImpl(ReferenceType block);
template <typename ReferenceType>
void append(ReferenceType block);
void append(Block && block);
bool isEnoughSize(const Block & block);
bool isEnoughSize(size_t rows, size_t bytes) const;

View File

@ -21,7 +21,7 @@ void SquashingChunksTransform::onConsume(Chunk chunk)
"onConsume {}", chunk.getNumRows());
if (cur_chunkinfos.empty())
cur_chunkinfos = chunk.getChunkInfos();
cur_chunkinfos = chunk.getChunkInfos().clone();
auto result = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
if (result.block)
@ -33,7 +33,7 @@ void SquashingChunksTransform::onConsume(Chunk chunk)
if (cur_chunkinfos.empty() && result.input_block_delayed)
{
cur_chunkinfos = chunk.getChunkInfos();
cur_chunkinfos = chunk.getChunkInfos().clone();
}
}
@ -79,12 +79,15 @@ SimpleSquashingChunksTransform::SimpleSquashingChunksTransform(
void SimpleSquashingChunksTransform::transform(Chunk & chunk)
{
LOG_DEBUG(getLogger("SimpleSquashingChunksTransform"),
"transform {}, finished {}", chunk.getNumRows(), finished);
"transform rows {}, size {}, columns {}, infos: {}/{}, finished {}",
chunk.getNumRows(), chunk.bytes(), chunk.getNumColumns(),
chunk.getChunkInfos().size(), chunk.getChunkInfos().debug(),
finished);
if (!finished)
{
if (cur_chunkinfos.empty())
cur_chunkinfos = chunk.getChunkInfos();
cur_chunkinfos = chunk.getChunkInfos().clone();
auto result = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
if (result.block)
@ -96,7 +99,7 @@ void SimpleSquashingChunksTransform::transform(Chunk & chunk)
if (cur_chunkinfos.empty() && result.input_block_delayed)
{
cur_chunkinfos = chunk.getChunkInfos();
cur_chunkinfos = chunk.getChunkInfos().clone();
}
}
else

View File

@ -442,13 +442,6 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)
delayed_chunk = std::make_unique<ReplicatedMergeTreeSinkImpl::DelayedChunk>();
delayed_chunk->partitions = std::move(partitions);
/// If deduplicated data should not be inserted into MV, we need to set proper
/// value for `last_block_is_duplicate`, which is possible only after the part is committed.
/// Othervide we can delay commit.
/// TODO: we can also delay commit if there is no MVs.
// if (!settings.deduplicate_blocks_in_dependent_materialized_views)
// finishDelayedChunk(zookeeper);
++num_blocks_processed;
}