Merge pull request #14315 from azat/EXPLAIN-SIGSEGV-fix

[RFC] Fix QueryPlan lifetime (for EXPLAIN PIPELINE graph=1) for queries with nested interpreter
This commit is contained in:
Nikolai Kochetov 2020-09-02 12:46:59 +03:00 committed by GitHub
commit 811e44a937
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 34 additions and 7 deletions

View File

@ -13,6 +13,7 @@
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Sources/DelayedSource.h>
#include <Processors/QueryPlan/QueryPlan.h>
namespace ProfileEvents
{
@ -68,14 +69,19 @@ SelectStreamFactory::SelectStreamFactory(
namespace
{
QueryPipeline createLocalStream(
auto createLocalPipe(
const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage)
{
checkStackSize();
InterpreterSelectQuery interpreter{query_ast, context, SelectQueryOptions(processed_stage)};
InterpreterSelectQuery interpreter(query_ast, context, SelectQueryOptions(processed_stage));
auto query_plan = std::make_unique<QueryPlan>();
auto pipeline = interpreter.execute().pipeline;
interpreter.buildQueryPlan(*query_plan);
auto pipeline = std::move(*query_plan->buildQueryPipeline());
/// Avoid going it out-of-scope for EXPLAIN
pipeline.addQueryPlan(std::move(query_plan));
pipeline.addSimpleTransform([&](const Block & source_header)
{
@ -94,7 +100,7 @@ QueryPipeline createLocalStream(
/// return std::make_shared<MaterializingBlockInputStream>(stream);
pipeline.setMaxThreads(1);
return pipeline;
return QueryPipeline::getPipe(std::move(pipeline));
}
String formattedAST(const ASTPtr & ast)
@ -130,7 +136,7 @@ void SelectStreamFactory::createForShard(
auto emplace_local_stream = [&]()
{
pipes.emplace_back(QueryPipeline::getPipe(createLocalStream(modified_query_ast, header, context, processed_stage)));
pipes.emplace_back(createLocalPipe(modified_query_ast, header, context, processed_stage));
};
String modified_query = formattedAST(modified_query_ast);
@ -270,7 +276,7 @@ void SelectStreamFactory::createForShard(
}
if (try_results.empty() || local_delay < max_remote_delay)
return QueryPipeline::getPipe(createLocalStream(modified_query_ast, header, context, stage));
return createLocalPipe(modified_query_ast, header, context, stage);
else
{
std::vector<IConnectionPool::Entry> connections;

View File

@ -269,7 +269,9 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
if (settings.graph)
{
auto processors = Pipe::detachProcessors(QueryPipeline::getPipe(std::move(*pipeline)));
/// Pipe holds QueryPlan, should not go out-of-scope
auto pipe = QueryPipeline::getPipe(std::move(*pipeline));
const auto & processors = pipe.getProcessors();
if (settings.compact)
printPipelineCompact(processors, buffer, settings.query_pipeline_options.header);

View File

@ -102,6 +102,8 @@ Pipe::Holder & Pipe::Holder::operator=(Holder && rhs)
storage_holders.insert(storage_holders.end(), rhs.storage_holders.begin(), rhs.storage_holders.end());
interpreter_context.insert(interpreter_context.end(),
rhs.interpreter_context.begin(), rhs.interpreter_context.end());
for (auto & plan : rhs.query_plans)
query_plans.emplace_back(std::move(plan));
return *this;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/QueryPlan/QueryPlan.h>
namespace DB
{
@ -8,6 +9,8 @@ namespace DB
class Pipe;
using Pipes = std::vector<Pipe>;
class QueryPipeline;
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
@ -86,6 +89,8 @@ public:
/// Get processors from Pipe. Use it with cautious, it is easy to loss totals and extremes ports.
static Processors detachProcessors(Pipe pipe) { return std::move(pipe.processors); }
/// Get processors from Pipe w/o destroying pipe (used for EXPLAIN to keep QueryPlan).
const Processors & getProcessors() const { return processors; }
/// Specify quotas and limits for every ISourceWithProgress.
void setLimits(const SourceWithProgress::LocalLimits & limits);
@ -96,6 +101,8 @@ public:
/// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible.
void addInterpreterContext(std::shared_ptr<Context> context) { holder.interpreter_context.emplace_back(std::move(context)); }
void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }
/// For queries with nested interpreters (i.e. StorageDistributed)
void addQueryPlan(std::unique_ptr<QueryPlan> plan) { holder.query_plans.emplace_back(std::move(plan)); }
private:
/// Destruction order: processors, header, locks, temporary storages, local contexts
@ -113,6 +120,7 @@ private:
std::vector<std::shared_ptr<Context>> interpreter_context;
std::vector<StoragePtr> storage_holders;
std::vector<TableLockHolder> table_locks;
std::vector<std::unique_ptr<QueryPlan>> query_plans;
};
Holder holder;

View File

@ -21,6 +21,8 @@ class QueryPipelineProcessorsCollector;
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
class QueryPlan;
class QueryPipeline
{
public:
@ -93,6 +95,7 @@ public:
void addTableLock(const TableLockHolder & lock) { pipe.addTableLock(lock); }
void addInterpreterContext(std::shared_ptr<Context> context) { pipe.addInterpreterContext(std::move(context)); }
void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); }
void addQueryPlan(std::unique_ptr<QueryPlan> plan) { pipe.addQueryPlan(std::move(plan)); }
/// For compatibility with IBlockInputStream.
void setProgressCallback(const ProgressCallback & callback);

View File

@ -0,0 +1,6 @@
--
-- regressions
--
-- SIGSEGV regression due to QueryPlan lifetime
EXPLAIN PIPELINE graph=1 SELECT * FROM remote('127.{1,2}', system.one) FORMAT Null;