diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index 986de85d712..ed7bd2cf71f 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -13,6 +13,7 @@ #include #include #include +#include namespace ProfileEvents { @@ -68,14 +69,19 @@ SelectStreamFactory::SelectStreamFactory( namespace { -QueryPipeline createLocalStream( +auto createLocalPipe( const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage) { checkStackSize(); - InterpreterSelectQuery interpreter{query_ast, context, SelectQueryOptions(processed_stage)}; + InterpreterSelectQuery interpreter(query_ast, context, SelectQueryOptions(processed_stage)); + auto query_plan = std::make_unique(); - auto pipeline = interpreter.execute().pipeline; + interpreter.buildQueryPlan(*query_plan); + auto pipeline = std::move(*query_plan->buildQueryPipeline()); + + /// Avoid going it out-of-scope for EXPLAIN + pipeline.addQueryPlan(std::move(query_plan)); pipeline.addSimpleTransform([&](const Block & source_header) { @@ -94,7 +100,7 @@ QueryPipeline createLocalStream( /// return std::make_shared(stream); pipeline.setMaxThreads(1); - return pipeline; + return QueryPipeline::getPipe(std::move(pipeline)); } String formattedAST(const ASTPtr & ast) @@ -130,7 +136,7 @@ void SelectStreamFactory::createForShard( auto emplace_local_stream = [&]() { - pipes.emplace_back(QueryPipeline::getPipe(createLocalStream(modified_query_ast, header, context, processed_stage))); + pipes.emplace_back(createLocalPipe(modified_query_ast, header, context, processed_stage)); }; String modified_query = formattedAST(modified_query_ast); @@ -270,7 +276,7 @@ void SelectStreamFactory::createForShard( } if (try_results.empty() || local_delay < max_remote_delay) - return QueryPipeline::getPipe(createLocalStream(modified_query_ast, header, context, stage)); + return createLocalPipe(modified_query_ast, header, context, stage); else { std::vector connections; diff --git a/src/Interpreters/InterpreterExplainQuery.cpp b/src/Interpreters/InterpreterExplainQuery.cpp index 9960509a5d7..c936556ce39 100644 --- a/src/Interpreters/InterpreterExplainQuery.cpp +++ b/src/Interpreters/InterpreterExplainQuery.cpp @@ -269,7 +269,9 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl() if (settings.graph) { - auto processors = Pipe::detachProcessors(QueryPipeline::getPipe(std::move(*pipeline))); + /// Pipe holds QueryPlan, should not go out-of-scope + auto pipe = QueryPipeline::getPipe(std::move(*pipeline)); + const auto & processors = pipe.getProcessors(); if (settings.compact) printPipelineCompact(processors, buffer, settings.query_pipeline_options.header); diff --git a/src/Processors/Pipe.cpp b/src/Processors/Pipe.cpp index 93dcd561c00..d28e54dae58 100644 --- a/src/Processors/Pipe.cpp +++ b/src/Processors/Pipe.cpp @@ -102,6 +102,8 @@ Pipe::Holder & Pipe::Holder::operator=(Holder && rhs) storage_holders.insert(storage_holders.end(), rhs.storage_holders.begin(), rhs.storage_holders.end()); interpreter_context.insert(interpreter_context.end(), rhs.interpreter_context.begin(), rhs.interpreter_context.end()); + for (auto & plan : rhs.query_plans) + query_plans.emplace_back(std::move(plan)); return *this; } diff --git a/src/Processors/Pipe.h b/src/Processors/Pipe.h index 28b64937aeb..f5f8b117db9 100644 --- a/src/Processors/Pipe.h +++ b/src/Processors/Pipe.h @@ -1,6 +1,7 @@ #pragma once #include #include +#include namespace DB { @@ -8,6 +9,8 @@ namespace DB class Pipe; using Pipes = std::vector; +class QueryPipeline; + class IStorage; using StoragePtr = std::shared_ptr; @@ -86,6 +89,8 @@ public: /// Get processors from Pipe. Use it with cautious, it is easy to loss totals and extremes ports. static Processors detachProcessors(Pipe pipe) { return std::move(pipe.processors); } + /// Get processors from Pipe w/o destroying pipe (used for EXPLAIN to keep QueryPlan). + const Processors & getProcessors() const { return processors; } /// Specify quotas and limits for every ISourceWithProgress. void setLimits(const SourceWithProgress::LocalLimits & limits); @@ -96,6 +101,8 @@ public: /// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible. void addInterpreterContext(std::shared_ptr context) { holder.interpreter_context.emplace_back(std::move(context)); } void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); } + /// For queries with nested interpreters (i.e. StorageDistributed) + void addQueryPlan(std::unique_ptr plan) { holder.query_plans.emplace_back(std::move(plan)); } private: /// Destruction order: processors, header, locks, temporary storages, local contexts @@ -113,6 +120,7 @@ private: std::vector> interpreter_context; std::vector storage_holders; std::vector table_locks; + std::vector> query_plans; }; Holder holder; diff --git a/src/Processors/QueryPipeline.h b/src/Processors/QueryPipeline.h index 385cf77198e..94de753bebc 100644 --- a/src/Processors/QueryPipeline.h +++ b/src/Processors/QueryPipeline.h @@ -21,6 +21,8 @@ class QueryPipelineProcessorsCollector; struct AggregatingTransformParams; using AggregatingTransformParamsPtr = std::shared_ptr; +class QueryPlan; + class QueryPipeline { public: @@ -93,6 +95,7 @@ public: void addTableLock(const TableLockHolder & lock) { pipe.addTableLock(lock); } void addInterpreterContext(std::shared_ptr context) { pipe.addInterpreterContext(std::move(context)); } void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); } + void addQueryPlan(std::unique_ptr plan) { pipe.addQueryPlan(std::move(plan)); } /// For compatibility with IBlockInputStream. void setProgressCallback(const ProgressCallback & callback); diff --git a/tests/queries/0_stateless/01470_explain.reference b/tests/queries/0_stateless/01470_explain.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01470_explain.sql b/tests/queries/0_stateless/01470_explain.sql new file mode 100644 index 00000000000..8fd145e7f65 --- /dev/null +++ b/tests/queries/0_stateless/01470_explain.sql @@ -0,0 +1,6 @@ +-- +-- regressions +-- + +-- SIGSEGV regression due to QueryPlan lifetime +EXPLAIN PIPELINE graph=1 SELECT * FROM remote('127.{1,2}', system.one) FORMAT Null;