From 95b00f8fd4464bf2b394e252e25dbea766ad6442 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 13 Sep 2021 19:23:56 +0300 Subject: [PATCH] Fix some live view tests. --- src/DataStreams/PushingToViewsBlockOutputStream.cpp | 9 +++++---- src/DataStreams/PushingToViewsBlockOutputStream.h | 3 ++- src/Processors/Executors/PullingPipelineExecutor.cpp | 5 ----- src/Storages/LiveView/StorageLiveView.cpp | 7 ++++--- 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 6201a06c74d..5f82104d914 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -286,7 +286,8 @@ Chain buildPushingToViewsDrain( ContextPtr context, const ASTPtr & query_ptr, bool no_destination, - ExceptionKeepingTransformRuntimeDataPtr runtime_data) + ExceptionKeepingTransformRuntimeDataPtr runtime_data, + const Block & lv_storage) { checkStackSize(); Chain result_chain; @@ -414,7 +415,7 @@ Chain buildPushingToViewsDrain( type = QueryViewsLogElement::ViewType::LIVE; query = live_view->getInnerQuery(); // Used only to log in system.query_views_log out = buildPushingToViewsDrain( - dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, view_runtime_data); + dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, view_runtime_data, storage_header); } else out = buildPushingToViewsDrain( @@ -438,7 +439,7 @@ Chain buildPushingToViewsDrain( nullptr, std::move(runtime_stats)}); - //if (type == QueryViewsLogElement::ViewType::MATERIALIZED) + if (type == QueryViewsLogElement::ViewType::MATERIALIZED) { auto executing_inner_query = std::make_shared( storage_header, views_data->views.back(), views_data->source_storage_id, views_data->source_metadata_snapshot, views_data->source_storage); @@ -497,7 +498,7 @@ Chain buildPushingToViewsDrain( if (auto * live_view = dynamic_cast(storage.get())) { - auto sink = std::make_shared(storage_header, *live_view, storage, context); + auto sink = std::make_shared(lv_storage, *live_view, storage, context); sink->setRuntimeData(runtime_data); result_chain.addSource(std::move(sink)); } diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.h b/src/DataStreams/PushingToViewsBlockOutputStream.h index 2bf1f79b556..96ccc0ac5b6 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.h +++ b/src/DataStreams/PushingToViewsBlockOutputStream.h @@ -51,7 +51,8 @@ Chain buildPushingToViewsDrain( ContextPtr context, const ASTPtr & query_ptr, bool no_destination, - ExceptionKeepingTransformRuntimeDataPtr runtime_data); + ExceptionKeepingTransformRuntimeDataPtr runtime_data, + const Block & lv_storage = {}); class ExecutingInnerQueryFromViewTransform final : public ExceptionKeepingTransform diff --git a/src/Processors/Executors/PullingPipelineExecutor.cpp b/src/Processors/Executors/PullingPipelineExecutor.cpp index 870787a3806..dc59e0a2f5a 100644 --- a/src/Processors/Executors/PullingPipelineExecutor.cpp +++ b/src/Processors/Executors/PullingPipelineExecutor.cpp @@ -6,11 +6,6 @@ namespace DB { -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - PullingPipelineExecutor::PullingPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_) { pulling_format = std::make_shared(pipeline.getHeader(), has_data_flag); diff --git a/src/Storages/LiveView/StorageLiveView.cpp b/src/Storages/LiveView/StorageLiveView.cpp index 69390850ccc..d6da59b596f 100644 --- a/src/Storages/LiveView/StorageLiveView.cpp +++ b/src/Storages/LiveView/StorageLiveView.cpp @@ -16,6 +16,7 @@ limitations under the License. */ #include #include #include +#include #include #include #include @@ -261,10 +262,10 @@ void StorageLiveView::writeIntoLiveView( } auto pipeline = live_view.completeQuery(std::move(from)); - pipeline.resize(1); - pipeline.setSinks([&](const Block &, Pipe::StreamType) + pipeline.addChain(Chain(std::move(output))); + pipeline.setSinks([&](const Block & cur_header, Pipe::StreamType) { - return std::move(output); + return std::make_shared(cur_header); }); auto executor = pipeline.execute();