Fix QueryPlan lifetime (for EXPLAIN PIPELINE graph=1) for queries with nested interpreter

Example of such queries are distributed queries, which creates local
InterpreterSelectQuery, which will have it's own QueryPlan but returns
Pipes that has that IQueryPlanStep attached.

After EXPLAIN PIPELINE graph=1 tries to use them, and will get SIGSEGV.

- TSAN:

<details>

```
==2782113==ERROR: AddressSanitizer: heap-use-after-free on address 0x6120000223c0 at pc 0x00002b8f3f3e bp 0x7fff18cfbff0 sp 0x7fff18cfbfe8
READ of size 8 at 0x6120000223c0 thread T22 (TCPHandler)
    #0 0x2b8f3f3d in DB::printPipelineCompact(std::__1::vector<std::__1::shared_ptr<DB::IProcessor>, std::__1::allocator<std::__1::shared_ptr<DB::IProcessor> > > const&, DB::WriteBuffer&, bool) /build/obj-x86_64-linux-gnu/../src/Processors/printPipeline.cpp:116:53
    #1 0x29ee698c in DB::InterpreterExplainQuery::executeImpl() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:275:17
    #2 0x29ee2e40 in DB::InterpreterExplainQuery::execute() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:73:14
    #3 0x2a7b44a2 in DB::executeQueryImpl(char const*, char const*, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer*) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:389:28
    #4 0x2a7b1cb3 in DB::executeQuery(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:675:30
    #5 0x2b7993b2 in DB::TCPHandler::runImpl() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:253:24
    #6 0x2b7b649a in DB::TCPHandler::run() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:1217:9
    #7 0x31d9c57e in Poco::Net::TCPServerConnection::start() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerConnection.cpp:43:3
    #8 0x31d9d281 in Poco::Net::TCPServerDispatcher::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerDispatcher.cpp:114:20
    #9 0x3206b5d5 in Poco::PooledThread::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/ThreadPool.cpp:199:14
    #10 0x320657ad in Poco::ThreadImpl::runnableEntry(void*) /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/Thread_POSIX.cpp:345:27
    #11 0x7ffff7f853e8 in start_thread (/usr/lib/libpthread.so.0+0x93e8)
    #12 0x7ffff7ea2292 in clone (/usr/lib/libc.so.6+0x100292)

0x6120000223c0 is located 0 bytes inside of 272-byte region [0x6120000223c0,0x6120000224d0)
freed by thread T22 (TCPHandler) here:
    #0 0x122f3b62 in operator delete(void*, unsigned long) (/src/ch/tmp/master-20200831/clickhouse+0x122f3b62)
    #1 0x2bd9e9fa in std::__1::default_delete<DB::IQueryPlanStep>::operator()(DB::IQueryPlanStep*) const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2363:5
    #2 0x2bd9e9fa in std::__1::unique_ptr<DB::IQueryPlanStep, std::__1::default_delete<DB::IQueryPlanStep> >::reset(DB::IQueryPlanStep*) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2618:7
    #3 0x2bd9e9fa in std::__1::unique_ptr<DB::IQueryPlanStep, std::__1::default_delete<DB::IQueryPlanStep> >::~unique_ptr() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2572:19
    #4 0x2bd9e9fa in DB::QueryPlan::Node::~Node() /build/obj-x86_64-linux-gnu/../src/Processors/QueryPlan/QueryPlan.h:66:12
    #5 0x2bd9e9fa in void std::__1::allocator_traits<std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> > >::__destroy<DB::QueryPlan::Node>(std::__1::integral_constant<bool, false>, std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> >&,
 DB::QueryPlan::Node*) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:1798:23
    #6 0x2bd9e9fa in void std::__1::allocator_traits<std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> > >::destroy<DB::QueryPlan::Node>(std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> >&, DB::QueryPlan::Node*) /build/obj-x86_64-lin
ux-gnu/../contrib/libcxx/include/memory:1630:14
    #7 0x2bd9e9fa in std::__1::__list_imp<DB::QueryPlan::Node, std::__1::allocator<DB::QueryPlan::Node> >::clear() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/list:762:13
    #8 0x29fece08 in DB::InterpreterSelectQuery::execute() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:492:1
    #9 0x2abf7484 in DB::ClusterProxy::(anonymous namespace)::createLocalStream(std::__1::shared_ptr<DB::IAST> const&, DB::Block const&, DB::Context const&, DB::QueryProcessingStage::Enum) /build/obj-x86_64-linux-gnu/../src/Interpreters/ClusterProxy/SelectStreamFactory.cpp:
78:33
    #10 0x2abea85d in DB::ClusterProxy::SelectStreamFactory::createForShard(DB::Cluster::ShardInfo const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<DB::IAST> const&, DB::Context const&, std::__1::shar
ed_ptr<DB::Throttler> const&, DB::SelectQueryInfo const&, std::__1::vector<DB::Pipe, std::__1::allocator<DB::Pipe> >&)::$_0::operator()() const /build/obj-x86_64-linux-gnu/../src/Interpreters/ClusterProxy/SelectStreamFactory.cpp:133:51
    #11 0x2abea85d in DB::ClusterProxy::SelectStreamFactory::createForShard(DB::Cluster::ShardInfo const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<DB::IAST> const&, DB::Context const&, std::__1::shar
ed_ptr<DB::Throttler> const&, DB::SelectQueryInfo const&, std::__1::vector<DB::Pipe, std::__1::allocator<DB::Pipe> >&) /build/obj-x86_64-linux-gnu/../src/Interpreters/ClusterProxy/SelectStreamFactory.cpp:189:13
    #12 0x2abe6d99 in DB::ClusterProxy::executeQuery(DB::ClusterProxy::IStreamFactory&, std::__1::shared_ptr<DB::Cluster> const&, Poco::Logger*, std::__1::shared_ptr<DB::IAST> const&, DB::Context const&, DB::Settings const&, DB::SelectQueryInfo const&) /build/obj-x86_64-lin
ux-gnu/../src/Interpreters/ClusterProxy/executeQuery.cpp:107:24
    #13 0x2abc4b74 in DB::StorageDistributed::read(std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, s
td::__1::shared_ptr<DB::StorageInMemoryMetadata const> const&, DB::SelectQueryInfo const&, DB::Context const&, DB::QueryProcessingStage::Enum, unsigned long, unsigned int) /build/obj-x86_64-linux-gnu/../src/Storages/StorageDistributed.cpp:514:12
    #14 0x2bda1c5a in DB::ReadFromStorageStep::ReadFromStorageStep(std::__1::shared_ptr<DB::RWLockImpl::LockHolderImpl>, std::__1::shared_ptr<DB::StorageInMemoryMetadata const>&, DB::SelectQueryOptions, std::__1::shared_ptr<DB::IStorage>, std::__1::vector<std::__1::basic_st
ring<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, DB::SelectQueryInfo const&, std::__1::shared_ptr<DB::Context>, DB::QueryProcessingStage
::Enum, unsigned long, unsigned long) /build/obj-x86_64-linux-gnu/../src/Processors/QueryPlan/ReadFromStorageStep.cpp:39:26
    #15 0x2a01ca70 in std::__1::__unique_if<DB::ReadFromStorageStep>::__unique_single std::__1::make_unique<DB::ReadFromStorageStep, std::__1::shared_ptr<DB::RWLockImpl::LockHolderImpl>&, std::__1::shared_ptr<DB::StorageInMemoryMetadata const>&, DB::SelectQueryOptions&, std
::__1::shared_ptr<DB::IStorage>&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > >&, DB::SelectQueryInfo&, st
d::__1::shared_ptr<DB::Context>&, DB::QueryProcessingStage::Enum&, unsigned long&, unsigned long&>(std::__1::shared_ptr<DB::RWLockImpl::LockHolderImpl>&, std::__1::shared_ptr<DB::StorageInMemoryMetadata const>&, DB::SelectQueryOptions&, std::__1::shared_ptr<DB::IStorage>&,
std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > >&, DB::SelectQueryInfo&, std::__1::shared_ptr<DB::Context>&,
DB::QueryProcessingStage::Enum&, unsigned long&, unsigned long&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:3028:32
    #16 0x29ff556a in DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum, DB::QueryPlan&, std::__1::shared_ptr<DB::PrewhereInfo> const&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::
__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:1383:26
    #17 0x29fe6b83 in DB::InterpreterSelectQuery::executeImpl(DB::QueryPlan&, std::__1::shared_ptr<DB::IBlockInputStream> const&, std::__1::optional<DB::Pipe>) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:795:9
    #18 0x29fe5771 in DB::InterpreterSelectQuery::buildQueryPlan(DB::QueryPlan&) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:473:5
    #19 0x2a47d370 in DB::InterpreterSelectWithUnionQuery::buildQueryPlan(DB::QueryPlan&) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectWithUnionQuery.cpp:182:38
    #20 0x29ee5bff in DB::InterpreterExplainQuery::executeImpl() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:265:21
    #21 0x29ee2e40 in DB::InterpreterExplainQuery::execute() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:73:14
    #22 0x2a7b44a2 in DB::executeQueryImpl(char const*, char const*, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer*) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:389:28
    #23 0x2a7b1cb3 in DB::executeQuery(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:675:30
    #24 0x2b7993b2 in DB::TCPHandler::runImpl() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:253:24
    #25 0x2b7b649a in DB::TCPHandler::run() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:1217:9
    #26 0x31d9c57e in Poco::Net::TCPServerConnection::start() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerConnection.cpp:43:3
    #27 0x31d9d281 in Poco::Net::TCPServerDispatcher::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerDispatcher.cpp:114:20
    #28 0x3206b5d5 in Poco::PooledThread::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/ThreadPool.cpp:199:14
    #29 0x320657ad in Poco::ThreadImpl::runnableEntry(void*) /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/Thread_POSIX.cpp:345:27
    #30 0x7ffff7f853e8 in start_thread (/usr/lib/libpthread.so.0+0x93e8)
```

</details>
This commit is contained in:
Azat Khuzhin 2020-09-01 02:22:33 +03:00
parent 05b10048a6
commit d04cda0367
7 changed files with 34 additions and 7 deletions

View File

@ -13,6 +13,7 @@
#include <Processors/Transforms/ConvertingTransform.h> #include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/RemoteSource.h> #include <Processors/Sources/RemoteSource.h>
#include <Processors/Sources/DelayedSource.h> #include <Processors/Sources/DelayedSource.h>
#include <Processors/QueryPlan/QueryPlan.h>
namespace ProfileEvents namespace ProfileEvents
{ {
@ -68,14 +69,19 @@ SelectStreamFactory::SelectStreamFactory(
namespace namespace
{ {
QueryPipeline createLocalStream( auto createLocalPipe(
const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage) const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage)
{ {
checkStackSize(); checkStackSize();
InterpreterSelectQuery interpreter{query_ast, context, SelectQueryOptions(processed_stage)}; InterpreterSelectQuery interpreter(query_ast, context, SelectQueryOptions(processed_stage));
auto query_plan = std::make_unique<QueryPlan>();
auto pipeline = interpreter.execute().pipeline; interpreter.buildQueryPlan(*query_plan);
auto pipeline = std::move(*query_plan->buildQueryPipeline());
/// Avoid going it out-of-scope for EXPLAIN
pipeline.addQueryPlan(std::move(query_plan));
pipeline.addSimpleTransform([&](const Block & source_header) pipeline.addSimpleTransform([&](const Block & source_header)
{ {
@ -94,7 +100,7 @@ QueryPipeline createLocalStream(
/// return std::make_shared<MaterializingBlockInputStream>(stream); /// return std::make_shared<MaterializingBlockInputStream>(stream);
pipeline.setMaxThreads(1); pipeline.setMaxThreads(1);
return pipeline; return QueryPipeline::getPipe(std::move(pipeline));
} }
String formattedAST(const ASTPtr & ast) String formattedAST(const ASTPtr & ast)
@ -130,7 +136,7 @@ void SelectStreamFactory::createForShard(
auto emplace_local_stream = [&]() auto emplace_local_stream = [&]()
{ {
pipes.emplace_back(QueryPipeline::getPipe(createLocalStream(modified_query_ast, header, context, processed_stage))); pipes.emplace_back(createLocalPipe(modified_query_ast, header, context, processed_stage));
}; };
String modified_query = formattedAST(modified_query_ast); String modified_query = formattedAST(modified_query_ast);
@ -270,7 +276,7 @@ void SelectStreamFactory::createForShard(
} }
if (try_results.empty() || local_delay < max_remote_delay) if (try_results.empty() || local_delay < max_remote_delay)
return QueryPipeline::getPipe(createLocalStream(modified_query_ast, header, context, stage)); return createLocalPipe(modified_query_ast, header, context, stage);
else else
{ {
std::vector<IConnectionPool::Entry> connections; std::vector<IConnectionPool::Entry> connections;

View File

@ -269,7 +269,9 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl()
if (settings.graph) if (settings.graph)
{ {
auto processors = Pipe::detachProcessors(QueryPipeline::getPipe(std::move(*pipeline))); /// Pipe holds QueryPlan, should not go out-of-scope
auto pipe = QueryPipeline::getPipe(std::move(*pipeline));
const auto & processors = pipe.getProcessors();
if (settings.compact) if (settings.compact)
printPipelineCompact(processors, buffer, settings.query_pipeline_options.header); printPipelineCompact(processors, buffer, settings.query_pipeline_options.header);

View File

@ -102,6 +102,8 @@ Pipe::Holder & Pipe::Holder::operator=(Holder && rhs)
storage_holders.insert(storage_holders.end(), rhs.storage_holders.begin(), rhs.storage_holders.end()); storage_holders.insert(storage_holders.end(), rhs.storage_holders.begin(), rhs.storage_holders.end());
interpreter_context.insert(interpreter_context.end(), interpreter_context.insert(interpreter_context.end(),
rhs.interpreter_context.begin(), rhs.interpreter_context.end()); rhs.interpreter_context.begin(), rhs.interpreter_context.end());
for (auto & plan : rhs.query_plans)
query_plans.emplace_back(std::move(plan));
return *this; return *this;
} }

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Processors/IProcessor.h> #include <Processors/IProcessor.h>
#include <Processors/Sources/SourceWithProgress.h> #include <Processors/Sources/SourceWithProgress.h>
#include <Processors/QueryPlan/QueryPlan.h>
namespace DB namespace DB
{ {
@ -8,6 +9,8 @@ namespace DB
class Pipe; class Pipe;
using Pipes = std::vector<Pipe>; using Pipes = std::vector<Pipe>;
class QueryPipeline;
class IStorage; class IStorage;
using StoragePtr = std::shared_ptr<IStorage>; using StoragePtr = std::shared_ptr<IStorage>;
@ -86,6 +89,8 @@ public:
/// Get processors from Pipe. Use it with cautious, it is easy to loss totals and extremes ports. /// Get processors from Pipe. Use it with cautious, it is easy to loss totals and extremes ports.
static Processors detachProcessors(Pipe pipe) { return std::move(pipe.processors); } static Processors detachProcessors(Pipe pipe) { return std::move(pipe.processors); }
/// Get processors from Pipe w/o destroying pipe (used for EXPLAIN to keep QueryPlan).
const Processors & getProcessors() const { return processors; }
/// Specify quotas and limits for every ISourceWithProgress. /// Specify quotas and limits for every ISourceWithProgress.
void setLimits(const SourceWithProgress::LocalLimits & limits); void setLimits(const SourceWithProgress::LocalLimits & limits);
@ -96,6 +101,8 @@ public:
/// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible. /// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible.
void addInterpreterContext(std::shared_ptr<Context> context) { holder.interpreter_context.emplace_back(std::move(context)); } void addInterpreterContext(std::shared_ptr<Context> context) { holder.interpreter_context.emplace_back(std::move(context)); }
void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); } void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }
/// For queries with nested interpreters (i.e. StorageDistributed)
void addQueryPlan(std::unique_ptr<QueryPlan> plan) { holder.query_plans.emplace_back(std::move(plan)); }
private: private:
/// Destruction order: processors, header, locks, temporary storages, local contexts /// Destruction order: processors, header, locks, temporary storages, local contexts
@ -113,6 +120,7 @@ private:
std::vector<std::shared_ptr<Context>> interpreter_context; std::vector<std::shared_ptr<Context>> interpreter_context;
std::vector<StoragePtr> storage_holders; std::vector<StoragePtr> storage_holders;
std::vector<TableLockHolder> table_locks; std::vector<TableLockHolder> table_locks;
std::vector<std::unique_ptr<QueryPlan>> query_plans;
}; };
Holder holder; Holder holder;

View File

@ -21,6 +21,8 @@ class QueryPipelineProcessorsCollector;
struct AggregatingTransformParams; struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>; using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
class QueryPlan;
class QueryPipeline class QueryPipeline
{ {
public: public:
@ -93,6 +95,7 @@ public:
void addTableLock(const TableLockHolder & lock) { pipe.addTableLock(lock); } void addTableLock(const TableLockHolder & lock) { pipe.addTableLock(lock); }
void addInterpreterContext(std::shared_ptr<Context> context) { pipe.addInterpreterContext(std::move(context)); } void addInterpreterContext(std::shared_ptr<Context> context) { pipe.addInterpreterContext(std::move(context)); }
void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); } void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); }
void addQueryPlan(std::unique_ptr<QueryPlan> plan) { pipe.addQueryPlan(std::move(plan)); }
/// For compatibility with IBlockInputStream. /// For compatibility with IBlockInputStream.
void setProgressCallback(const ProgressCallback & callback); void setProgressCallback(const ProgressCallback & callback);

View File

@ -0,0 +1,6 @@
--
-- regressions
--
-- SIGSEGV regression due to QueryPlan lifetime
EXPLAIN PIPELINE graph=1 SELECT * FROM remote('127.{1,2}', system.one) FORMAT Null;