Merge pull request #59899 from ClickHouse/refactor-squashing-for-inserts

Refactor Squashing for inserts.
This commit is contained in:
Nikolai Kochetov 2024-02-14 19:29:11 +01:00 committed by GitHub
commit 9c626f9c36
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 81 additions and 34 deletions

View File

@ -274,7 +274,7 @@ Chain InterpreterInsertQuery::buildChain(
auto sample = getSampleBlock(columns, table, metadata_snapshot);
Chain sink = buildSink(table, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms);
Chain chain = buildPreSinkChain(sink.getInputHeader(), table, metadata_snapshot, sample, thread_status_holder);
Chain chain = buildPreSinkChain(sink.getInputHeader(), table, metadata_snapshot, sample);
chain.appendChain(std::move(sink));
return chain;
@ -317,25 +317,31 @@ Chain InterpreterInsertQuery::buildSink(
return out;
}
bool InterpreterInsertQuery::shouldAddSquashingFroStorage(const StoragePtr & table) const
{
auto context_ptr = getContext();
const Settings & settings = context_ptr->getSettingsRef();
const ASTInsertQuery * query = nullptr;
if (query_ptr)
query = query_ptr->as<ASTInsertQuery>();
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
return !(settings.distributed_foreground_insert && table->isRemote()) && !async_insert && !no_squash && !(query && query->watch);
}
Chain InterpreterInsertQuery::buildPreSinkChain(
const Block & subsequent_header,
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder)
const Block & query_sample_block)
{
ThreadStatus * thread_status = current_thread;
if (!thread_status_holder)
thread_status = nullptr;
auto context_ptr = getContext();
const ASTInsertQuery * query = nullptr;
if (query_ptr)
query = query_ptr->as<ASTInsertQuery>();
const Settings & settings = context_ptr->getSettingsRef();
bool null_as_default = query && query->select && context_ptr->getSettingsRef().insert_null_as_default;
/// We create a pipeline of several streams, into which we will write data.
@ -366,26 +372,6 @@ Chain InterpreterInsertQuery::buildPreSinkChain(
/// because some clients break insertion protocol (columns != header)
out.addSource(std::make_shared<ConvertingTransform>(query_sample_block, adding_missing_defaults_actions));
/// It's important to squash blocks as early as possible (before other transforms),
/// because other transforms may work inefficient if block size is small.
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(settings.distributed_foreground_insert && table->isRemote()) && !async_insert && !no_squash && !(query && query->watch))
{
bool table_prefers_large_blocks = table->prefersLargeBlocks();
out.addSource(std::make_shared<SquashingChunksTransform>(
input_header(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL));
}
auto counting = std::make_shared<CountingTransform>(input_header(), thread_status, getContext()->getQuota());
counting->setProcessListElement(context_ptr->getProcessListElement());
counting->setProgressCallback(context_ptr->getProgressCallback());
out.addSource(std::move(counting));
return out;
}
@ -558,8 +544,7 @@ BlockIO InterpreterInsertQuery::execute()
}
for (size_t i = 0; i < pre_streams_size; ++i)
{
auto out = buildPreSinkChain(sink_chains[0].getInputHeader(), table, metadata_snapshot,
query_sample_block, /* thread_status_holder= */ nullptr);
auto out = buildPreSinkChain(sink_chains[0].getInputHeader(), table, metadata_snapshot, query_sample_block);
presink_chains.emplace_back(std::move(out));
}
}
@ -592,6 +577,29 @@ BlockIO InterpreterInsertQuery::execute()
return std::make_shared<MaterializingTransform>(in_header);
});
pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
auto context_ptr = getContext();
auto counting = std::make_shared<CountingTransform>(in_header, nullptr, context_ptr->getQuota());
counting->setProcessListElement(context_ptr->getProcessListElement());
counting->setProgressCallback(context_ptr->getProgressCallback());
return counting;
});
if (shouldAddSquashingFroStorage(table))
{
bool table_prefers_large_blocks = table->prefersLargeBlocks();
pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
return std::make_shared<SimpleSquashingChunksTransform>(
in_header,
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL);
});
}
size_t num_select_threads = pipeline.getNumThreads();
for (auto & chain : presink_chains)
@ -634,7 +642,27 @@ BlockIO InterpreterInsertQuery::execute()
}
else
{
presink_chains.at(0).appendChain(std::move(sink_chains.at(0)));
auto & chain = presink_chains.at(0);
chain.appendChain(std::move(sink_chains.at(0)));
if (shouldAddSquashingFroStorage(table))
{
bool table_prefers_large_blocks = table->prefersLargeBlocks();
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(
chain.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL);
chain.addSource(std::move(squashing));
}
auto context_ptr = getContext();
auto counting = std::make_shared<CountingTransform>(chain.getInputHeader(), nullptr, context_ptr->getQuota());
counting->setProcessListElement(context_ptr->getProcessListElement());
counting->setProgressCallback(context_ptr->getProgressCallback());
chain.addSource(std::move(counting));
res.pipeline = QueryPipeline(std::move(presink_chains[0]));
res.pipeline.setNumThreads(std::min<size_t>(res.pipeline.getNumThreads(), settings.max_threads));
res.pipeline.setConcurrencyControl(settings.use_concurrency_control);

View File

@ -59,6 +59,8 @@ public:
void addBuffer(std::unique_ptr<ReadBuffer> buffer) { owned_buffers.push_back(std::move(buffer)); }
bool shouldAddSquashingFroStorage(const StoragePtr & table) const;
private:
Block getSampleBlock(const Names & names, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
@ -81,8 +83,7 @@ private:
const Block & subsequent_header,
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder);
const Block & query_sample_block);
};

View File

@ -5,6 +5,7 @@
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/Transforms/CountingTransform.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
@ -412,6 +413,23 @@ Chain buildPushingToViewsChain(
InterpreterInsertQuery interpreter(nullptr, view_insert_context, false, false, false);
out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms);
if (interpreter.shouldAddSquashingFroStorage(inner_table))
{
bool table_prefers_large_blocks = inner_table->prefersLargeBlocks();
const auto & settings = view_insert_context->getSettingsRef();
out.addSource(std::make_shared<SquashingChunksTransform>(
out.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL));
}
auto counting = std::make_shared<CountingTransform>(out.getInputHeader(), current_thread, view_insert_context->getQuota());
counting->setProcessListElement(view_insert_context->getProcessListElement());
counting->setProgressCallback(view_insert_context->getProgressCallback());
out.addSource(std::move(counting));
out.addStorageHolder(view);
out.addStorageHolder(inner_table);
}