Fix some live view tests.

This commit is contained in:
Nikolai Kochetov 2021-09-13 19:23:56 +03:00
parent b37168e86f
commit 95b00f8fd4
4 changed files with 11 additions and 13 deletions

View File

@ -286,7 +286,8 @@ Chain buildPushingToViewsDrain(
ContextPtr context, ContextPtr context,
const ASTPtr & query_ptr, const ASTPtr & query_ptr,
bool no_destination, bool no_destination,
ExceptionKeepingTransformRuntimeDataPtr runtime_data) ExceptionKeepingTransformRuntimeDataPtr runtime_data,
const Block & lv_storage)
{ {
checkStackSize(); checkStackSize();
Chain result_chain; Chain result_chain;
@ -414,7 +415,7 @@ Chain buildPushingToViewsDrain(
type = QueryViewsLogElement::ViewType::LIVE; type = QueryViewsLogElement::ViewType::LIVE;
query = live_view->getInnerQuery(); // Used only to log in system.query_views_log query = live_view->getInnerQuery(); // Used only to log in system.query_views_log
out = buildPushingToViewsDrain( out = buildPushingToViewsDrain(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, view_runtime_data); dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, view_runtime_data, storage_header);
} }
else else
out = buildPushingToViewsDrain( out = buildPushingToViewsDrain(
@ -438,7 +439,7 @@ Chain buildPushingToViewsDrain(
nullptr, nullptr,
std::move(runtime_stats)}); std::move(runtime_stats)});
//if (type == QueryViewsLogElement::ViewType::MATERIALIZED) if (type == QueryViewsLogElement::ViewType::MATERIALIZED)
{ {
auto executing_inner_query = std::make_shared<ExecutingInnerQueryFromViewTransform>( auto executing_inner_query = std::make_shared<ExecutingInnerQueryFromViewTransform>(
storage_header, views_data->views.back(), views_data->source_storage_id, views_data->source_metadata_snapshot, views_data->source_storage); storage_header, views_data->views.back(), views_data->source_storage_id, views_data->source_metadata_snapshot, views_data->source_storage);
@ -497,7 +498,7 @@ Chain buildPushingToViewsDrain(
if (auto * live_view = dynamic_cast<StorageLiveView *>(storage.get())) if (auto * live_view = dynamic_cast<StorageLiveView *>(storage.get()))
{ {
auto sink = std::make_shared<PushingToLiveViewSink>(storage_header, *live_view, storage, context); auto sink = std::make_shared<PushingToLiveViewSink>(lv_storage, *live_view, storage, context);
sink->setRuntimeData(runtime_data); sink->setRuntimeData(runtime_data);
result_chain.addSource(std::move(sink)); result_chain.addSource(std::move(sink));
} }

View File

@ -51,7 +51,8 @@ Chain buildPushingToViewsDrain(
ContextPtr context, ContextPtr context,
const ASTPtr & query_ptr, const ASTPtr & query_ptr,
bool no_destination, bool no_destination,
ExceptionKeepingTransformRuntimeDataPtr runtime_data); ExceptionKeepingTransformRuntimeDataPtr runtime_data,
const Block & lv_storage = {});
class ExecutingInnerQueryFromViewTransform final : public ExceptionKeepingTransform class ExecutingInnerQueryFromViewTransform final : public ExceptionKeepingTransform

View File

@ -6,11 +6,6 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
PullingPipelineExecutor::PullingPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_) PullingPipelineExecutor::PullingPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
{ {
pulling_format = std::make_shared<PullingOutputFormat>(pipeline.getHeader(), has_data_flag); pulling_format = std::make_shared<PullingOutputFormat>(pipeline.getHeader(), has_data_flag);

View File

@ -16,6 +16,7 @@ limitations under the License. */
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h> #include <Interpreters/InterpreterSelectQuery.h>
#include <Processors/Sources/BlocksSource.h> #include <Processors/Sources/BlocksSource.h>
#include <Processors/Sinks/ExceptionHandlingSink.h>
#include <Processors/Transforms/MaterializingTransform.h> #include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Executors/PullingPipelineExecutor.h> #include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Transforms/SquashingChunksTransform.h> #include <Processors/Transforms/SquashingChunksTransform.h>
@ -261,10 +262,10 @@ void StorageLiveView::writeIntoLiveView(
} }
auto pipeline = live_view.completeQuery(std::move(from)); auto pipeline = live_view.completeQuery(std::move(from));
pipeline.resize(1); pipeline.addChain(Chain(std::move(output)));
pipeline.setSinks([&](const Block &, Pipe::StreamType) pipeline.setSinks([&](const Block & cur_header, Pipe::StreamType)
{ {
return std::move(output); return std::make_shared<ExceptionHandlingSink>(cur_header);
}); });
auto executor = pipeline.execute(); auto executor = pipeline.execute();