Allow to use pulling executor in materialized views

This commit is contained in:
alesapin 2024-11-26 19:39:34 +01:00
parent f7795a638d
commit 6e9e0c82be
3 changed files with 32 additions and 8 deletions

View File

@ -5521,6 +5521,8 @@ The default value is `CURRENT_USER`.
DECLARE(UInt64, cache_warmer_threads, 4, R"(
Only available in ClickHouse Cloud. Number of background threads for speculatively downloading new data parts into file cache, when cache_populated_by_fetch is enabled. Zero to disable.
)", 0) \
DECLARE(Bool, use_async_executor_for_materialized_views, false, R"(
Use async and potentially multithreaded execution of materialized view query, can speedup views processing during INSERT, but also consume more memory.)", 0) \
DECLARE(Int64, ignore_cold_parts_seconds, 0, R"(
Only available in ClickHouse Cloud. Exclude new data parts from SELECT queries until they're either pre-warmed (see cache_populated_by_fetch) or this many seconds old. Only for Replicated-/SharedMergeTree.
)", 0) \

View File

@ -11,6 +11,7 @@
#include <Processors/Transforms/PlanSquashingTransform.h>
#include <Processors/Transforms/SquashingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Storages/LiveView/StorageLiveView.h>
#include <Storages/WindowView/StorageWindowView.h>
@ -63,6 +64,7 @@ namespace Setting
extern const SettingsUInt64 min_insert_block_size_rows_for_materialized_views;
extern const SettingsBool parallel_view_processing;
extern const SettingsBool use_concurrency_control;
extern const SettingsBool use_async_executor_for_materialized_views;
}
namespace ErrorCodes
@ -129,6 +131,7 @@ private:
};
/// For source chunk, execute view query over it.
template <typename Executor>
class ExecutingInnerQueryFromViewTransform final : public ExceptionKeepingTransform
{
public:
@ -148,7 +151,7 @@ private:
struct State
{
QueryPipeline pipeline;
PullingPipelineExecutor executor;
Executor executor;
explicit State(QueryPipeline pipeline_)
: pipeline(std::move(pipeline_))
@ -428,17 +431,31 @@ std::optional<Chain> generateViewChain(
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Right after Inner query", out.getInputHeader()));
#endif
auto executing_inner_query = std::make_shared<ExecutingInnerQueryFromViewTransform>(
storage_header, views_data->views.back(), views_data, disable_deduplication_for_children);
executing_inner_query->setRuntimeData(view_thread_status, view_counter_ms);
if (context->getSettingsRef()[Setting::use_async_executor_for_materialized_views])
{
auto executing_inner_query = std::make_shared<ExecutingInnerQueryFromViewTransform<PullingAsyncPipelineExecutor>>(
storage_header, views_data->views.back(), views_data, disable_deduplication_for_children);
executing_inner_query->setRuntimeData(view_thread_status, view_counter_ms);
out.addSource(std::move(executing_inner_query));
out.addSource(std::move(executing_inner_query));
}
else
{
auto executing_inner_query = std::make_shared<ExecutingInnerQueryFromViewTransform<PullingPipelineExecutor>>(
storage_header, views_data->views.back(), views_data, disable_deduplication_for_children);
executing_inner_query->setRuntimeData(view_thread_status, view_counter_ms);
out.addSource(std::move(executing_inner_query));
}
#ifdef ABORT_ON_LOGICAL_ERROR
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Right before Inner query", out.getInputHeader()));
#endif
}
return out;
}
@ -766,7 +783,8 @@ IProcessor::Status CopyingDataToViewsTransform::prepare()
}
ExecutingInnerQueryFromViewTransform::ExecutingInnerQueryFromViewTransform(
template <typename Executor>
ExecutingInnerQueryFromViewTransform<Executor>::ExecutingInnerQueryFromViewTransform(
const Block & header,
ViewRuntimeData & view_,
std::shared_ptr<ViewsData> views_data_,
@ -778,14 +796,16 @@ ExecutingInnerQueryFromViewTransform::ExecutingInnerQueryFromViewTransform(
{
}
void ExecutingInnerQueryFromViewTransform::onConsume(Chunk chunk)
template <typename Executor>
void ExecutingInnerQueryFromViewTransform<Executor>::onConsume(Chunk chunk)
{
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
state.emplace(process(std::move(block), view, *views_data, std::move(chunk.getChunkInfos()), disable_deduplication_for_children));
}
ExecutingInnerQueryFromViewTransform::GenerateResult ExecutingInnerQueryFromViewTransform::onGenerate()
template <typename Executor>
ExecutingInnerQueryFromViewTransform<Executor>::GenerateResult ExecutingInnerQueryFromViewTransform<Executor>::onGenerate()
{
GenerateResult res;
if (!state.has_value())

View File

@ -2,6 +2,8 @@ DROP TABLE IF EXISTS src;
DROP TABLE IF EXISTS dst;
DROP TABLE IF EXISTS matview;
SET use_async_executor_for_materialized_views=1;
CREATE TABLE src (
event_time DateTime,
key UInt64,