ClickHouse/src/Processors/QueryPipeline.h

182 lines
6.7 KiB
C++
Raw Normal View History

2019-03-26 18:28:37 +00:00
#pragma once
2019-04-05 10:52:07 +00:00
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/IProcessor.h>
#include <Processors/Pipe.h>
#include <Storages/IStorage_fwd.h>
2020-06-22 09:49:21 +00:00
#include <Storages/TableLockHolder.h>
2019-03-26 18:28:37 +00:00
namespace DB
{
2019-04-08 14:55:20 +00:00
class IOutputFormat;
2020-06-25 09:39:17 +00:00
class QueryPipelineProcessorsCollector;
2020-08-04 13:06:59 +00:00
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
Fix QueryPlan lifetime (for EXPLAIN PIPELINE graph=1) for queries with nested interpreter Example of such queries are distributed queries, which creates local InterpreterSelectQuery, which will have it's own QueryPlan but returns Pipes that has that IQueryPlanStep attached. After EXPLAIN PIPELINE graph=1 tries to use them, and will get SIGSEGV. - TSAN: <details> ``` ==2782113==ERROR: AddressSanitizer: heap-use-after-free on address 0x6120000223c0 at pc 0x00002b8f3f3e bp 0x7fff18cfbff0 sp 0x7fff18cfbfe8 READ of size 8 at 0x6120000223c0 thread T22 (TCPHandler) #0 0x2b8f3f3d in DB::printPipelineCompact(std::__1::vector<std::__1::shared_ptr<DB::IProcessor>, std::__1::allocator<std::__1::shared_ptr<DB::IProcessor> > > const&, DB::WriteBuffer&, bool) /build/obj-x86_64-linux-gnu/../src/Processors/printPipeline.cpp:116:53 #1 0x29ee698c in DB::InterpreterExplainQuery::executeImpl() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:275:17 #2 0x29ee2e40 in DB::InterpreterExplainQuery::execute() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:73:14 #3 0x2a7b44a2 in DB::executeQueryImpl(char const*, char const*, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer*) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:389:28 #4 0x2a7b1cb3 in DB::executeQuery(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:675:30 #5 0x2b7993b2 in DB::TCPHandler::runImpl() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:253:24 #6 0x2b7b649a in DB::TCPHandler::run() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:1217:9 #7 0x31d9c57e in Poco::Net::TCPServerConnection::start() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerConnection.cpp:43:3 #8 0x31d9d281 in Poco::Net::TCPServerDispatcher::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerDispatcher.cpp:114:20 #9 0x3206b5d5 in Poco::PooledThread::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/ThreadPool.cpp:199:14 #10 0x320657ad in Poco::ThreadImpl::runnableEntry(void*) /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/Thread_POSIX.cpp:345:27 #11 0x7ffff7f853e8 in start_thread (/usr/lib/libpthread.so.0+0x93e8) #12 0x7ffff7ea2292 in clone (/usr/lib/libc.so.6+0x100292) 0x6120000223c0 is located 0 bytes inside of 272-byte region [0x6120000223c0,0x6120000224d0) freed by thread T22 (TCPHandler) here: #0 0x122f3b62 in operator delete(void*, unsigned long) (/src/ch/tmp/master-20200831/clickhouse+0x122f3b62) #1 0x2bd9e9fa in std::__1::default_delete<DB::IQueryPlanStep>::operator()(DB::IQueryPlanStep*) const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2363:5 #2 0x2bd9e9fa in std::__1::unique_ptr<DB::IQueryPlanStep, std::__1::default_delete<DB::IQueryPlanStep> >::reset(DB::IQueryPlanStep*) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2618:7 #3 0x2bd9e9fa in std::__1::unique_ptr<DB::IQueryPlanStep, std::__1::default_delete<DB::IQueryPlanStep> >::~unique_ptr() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2572:19 #4 0x2bd9e9fa in DB::QueryPlan::Node::~Node() /build/obj-x86_64-linux-gnu/../src/Processors/QueryPlan/QueryPlan.h:66:12 #5 0x2bd9e9fa in void std::__1::allocator_traits<std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> > >::__destroy<DB::QueryPlan::Node>(std::__1::integral_constant<bool, false>, std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> >&, DB::QueryPlan::Node*) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:1798:23 #6 0x2bd9e9fa in void std::__1::allocator_traits<std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> > >::destroy<DB::QueryPlan::Node>(std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> >&, DB::QueryPlan::Node*) /build/obj-x86_64-lin ux-gnu/../contrib/libcxx/include/memory:1630:14 #7 0x2bd9e9fa in std::__1::__list_imp<DB::QueryPlan::Node, std::__1::allocator<DB::QueryPlan::Node> >::clear() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/list:762:13 #8 0x29fece08 in DB::InterpreterSelectQuery::execute() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:492:1 #9 0x2abf7484 in DB::ClusterProxy::(anonymous namespace)::createLocalStream(std::__1::shared_ptr<DB::IAST> const&, DB::Block const&, DB::Context const&, DB::QueryProcessingStage::Enum) /build/obj-x86_64-linux-gnu/../src/Interpreters/ClusterProxy/SelectStreamFactory.cpp: 78:33 #10 0x2abea85d in DB::ClusterProxy::SelectStreamFactory::createForShard(DB::Cluster::ShardInfo const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<DB::IAST> const&, DB::Context const&, std::__1::shar ed_ptr<DB::Throttler> const&, DB::SelectQueryInfo const&, std::__1::vector<DB::Pipe, std::__1::allocator<DB::Pipe> >&)::$_0::operator()() const /build/obj-x86_64-linux-gnu/../src/Interpreters/ClusterProxy/SelectStreamFactory.cpp:133:51 #11 0x2abea85d in DB::ClusterProxy::SelectStreamFactory::createForShard(DB::Cluster::ShardInfo const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<DB::IAST> const&, DB::Context const&, std::__1::shar ed_ptr<DB::Throttler> const&, DB::SelectQueryInfo const&, std::__1::vector<DB::Pipe, std::__1::allocator<DB::Pipe> >&) /build/obj-x86_64-linux-gnu/../src/Interpreters/ClusterProxy/SelectStreamFactory.cpp:189:13 #12 0x2abe6d99 in DB::ClusterProxy::executeQuery(DB::ClusterProxy::IStreamFactory&, std::__1::shared_ptr<DB::Cluster> const&, Poco::Logger*, std::__1::shared_ptr<DB::IAST> const&, DB::Context const&, DB::Settings const&, DB::SelectQueryInfo const&) /build/obj-x86_64-lin ux-gnu/../src/Interpreters/ClusterProxy/executeQuery.cpp:107:24 #13 0x2abc4b74 in DB::StorageDistributed::read(std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, s td::__1::shared_ptr<DB::StorageInMemoryMetadata const> const&, DB::SelectQueryInfo const&, DB::Context const&, DB::QueryProcessingStage::Enum, unsigned long, unsigned int) /build/obj-x86_64-linux-gnu/../src/Storages/StorageDistributed.cpp:514:12 #14 0x2bda1c5a in DB::ReadFromStorageStep::ReadFromStorageStep(std::__1::shared_ptr<DB::RWLockImpl::LockHolderImpl>, std::__1::shared_ptr<DB::StorageInMemoryMetadata const>&, DB::SelectQueryOptions, std::__1::shared_ptr<DB::IStorage>, std::__1::vector<std::__1::basic_st ring<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, DB::SelectQueryInfo const&, std::__1::shared_ptr<DB::Context>, DB::QueryProcessingStage ::Enum, unsigned long, unsigned long) /build/obj-x86_64-linux-gnu/../src/Processors/QueryPlan/ReadFromStorageStep.cpp:39:26 #15 0x2a01ca70 in std::__1::__unique_if<DB::ReadFromStorageStep>::__unique_single std::__1::make_unique<DB::ReadFromStorageStep, std::__1::shared_ptr<DB::RWLockImpl::LockHolderImpl>&, std::__1::shared_ptr<DB::StorageInMemoryMetadata const>&, DB::SelectQueryOptions&, std ::__1::shared_ptr<DB::IStorage>&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > >&, DB::SelectQueryInfo&, st d::__1::shared_ptr<DB::Context>&, DB::QueryProcessingStage::Enum&, unsigned long&, unsigned long&>(std::__1::shared_ptr<DB::RWLockImpl::LockHolderImpl>&, std::__1::shared_ptr<DB::StorageInMemoryMetadata const>&, DB::SelectQueryOptions&, std::__1::shared_ptr<DB::IStorage>&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > >&, DB::SelectQueryInfo&, std::__1::shared_ptr<DB::Context>&, DB::QueryProcessingStage::Enum&, unsigned long&, unsigned long&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:3028:32 #16 0x29ff556a in DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum, DB::QueryPlan&, std::__1::shared_ptr<DB::PrewhereInfo> const&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std:: __1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:1383:26 #17 0x29fe6b83 in DB::InterpreterSelectQuery::executeImpl(DB::QueryPlan&, std::__1::shared_ptr<DB::IBlockInputStream> const&, std::__1::optional<DB::Pipe>) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:795:9 #18 0x29fe5771 in DB::InterpreterSelectQuery::buildQueryPlan(DB::QueryPlan&) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:473:5 #19 0x2a47d370 in DB::InterpreterSelectWithUnionQuery::buildQueryPlan(DB::QueryPlan&) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectWithUnionQuery.cpp:182:38 #20 0x29ee5bff in DB::InterpreterExplainQuery::executeImpl() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:265:21 #21 0x29ee2e40 in DB::InterpreterExplainQuery::execute() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:73:14 #22 0x2a7b44a2 in DB::executeQueryImpl(char const*, char const*, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer*) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:389:28 #23 0x2a7b1cb3 in DB::executeQuery(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:675:30 #24 0x2b7993b2 in DB::TCPHandler::runImpl() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:253:24 #25 0x2b7b649a in DB::TCPHandler::run() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:1217:9 #26 0x31d9c57e in Poco::Net::TCPServerConnection::start() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerConnection.cpp:43:3 #27 0x31d9d281 in Poco::Net::TCPServerDispatcher::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerDispatcher.cpp:114:20 #28 0x3206b5d5 in Poco::PooledThread::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/ThreadPool.cpp:199:14 #29 0x320657ad in Poco::ThreadImpl::runnableEntry(void*) /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/Thread_POSIX.cpp:345:27 #30 0x7ffff7f853e8 in start_thread (/usr/lib/libpthread.so.0+0x93e8) ``` </details>
2020-08-31 23:22:33 +00:00
class QueryPlan;
struct SubqueryForSet;
using SubqueriesForSets = std::unordered_map<String, SubqueryForSet>;
2020-09-16 16:30:48 +00:00
struct SizeLimits;
2021-03-04 17:38:12 +00:00
struct ExpressionActionsSettings;
2019-03-26 18:28:37 +00:00
class QueryPipeline
{
public:
QueryPipeline() = default;
2020-02-27 15:40:11 +00:00
~QueryPipeline() = default;
2020-08-04 13:06:59 +00:00
QueryPipeline(QueryPipeline &&) = default;
2020-02-27 15:40:11 +00:00
QueryPipeline(const QueryPipeline &) = delete;
2020-08-04 15:51:56 +00:00
QueryPipeline & operator= (QueryPipeline && rhs) = default;
2020-08-04 13:06:59 +00:00
QueryPipeline & operator= (const QueryPipeline & rhs) = delete;
2019-03-26 18:28:37 +00:00
2019-11-05 17:33:03 +00:00
/// All pipes must have same header.
2020-08-03 15:54:53 +00:00
void init(Pipe pipe);
2020-08-06 12:24:05 +00:00
/// Clear and release all resources.
void reset();
2020-08-04 13:06:59 +00:00
bool initialized() { return !pipe.empty(); }
bool isCompleted() { return pipe.isCompleted(); }
2019-04-09 10:17:25 +00:00
2020-08-04 13:06:59 +00:00
using StreamType = Pipe::StreamType;
2019-03-26 18:28:37 +00:00
2020-05-27 18:20:26 +00:00
/// Add transform with simple input and simple output for each port.
2020-08-04 13:06:59 +00:00
void addSimpleTransform(const Pipe::ProcessorGetter & getter);
void addSimpleTransform(const Pipe::ProcessorGetterWithStreamKind & getter);
/// Add transform with getNumStreams() input ports.
void addTransform(ProcessorPtr transform);
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform pipeline in general way.
void transform(const Transformer & transformer);
2020-05-27 18:20:26 +00:00
/// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port.
2019-03-26 18:28:37 +00:00
void addTotalsHavingTransform(ProcessorPtr transform);
2020-05-27 18:20:26 +00:00
/// Add transform which calculates extremes. This transform adds extremes port and doesn't change inputs number.
2020-04-08 12:40:04 +00:00
void addExtremesTransform();
2020-05-27 18:20:26 +00:00
/// Resize pipeline to single output and add IOutputFormat. Pipeline will be completed after this transformation.
void setOutputFormat(ProcessorPtr output);
/// Get current OutputFormat.
IOutputFormat * getOutputFormat() const { return output_format; }
2020-05-27 18:20:26 +00:00
/// Sink is a processor with single input port and no output ports. Creates sink for each output port.
/// Pipeline will be completed after this transformation.
2020-08-04 13:06:59 +00:00
void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter);
2019-03-26 18:28:37 +00:00
2019-04-09 14:51:38 +00:00
/// Add totals which returns one chunk with single row with defaults.
void addDefaultTotals();
/// Forget about current totals and extremes. It is needed before aggregation, cause they will be calculated again.
void dropTotalsAndExtremes();
2019-04-17 15:35:22 +00:00
2019-03-26 18:28:37 +00:00
/// Will read from this stream after all data was read from other streams.
void addDelayedStream(ProcessorPtr source);
2020-08-04 13:06:59 +00:00
void addMergingAggregatedMemoryEfficientTransform(AggregatingTransformParamsPtr params, size_t num_merging_processors);
2020-10-12 09:30:05 +00:00
/// Changes the number of output ports if needed. Adds ResizeTransform.
2020-01-13 12:04:02 +00:00
void resize(size_t num_streams, bool force = false, bool strict = false);
2019-03-26 18:28:37 +00:00
2020-06-25 09:39:17 +00:00
/// Unite several pipelines together. Result pipeline would have common_header structure.
/// If collector is used, it will collect only newly-added processors, but not processors from pipelines.
2020-08-04 15:51:56 +00:00
static QueryPipeline unitePipelines(
std::vector<std::unique_ptr<QueryPipeline>> pipelines,
size_t max_threads_limit = 0,
Processors * collected_processors = nullptr);
2019-03-26 18:28:37 +00:00
2020-09-15 17:13:13 +00:00
/// Add other pipeline and execute it before current one.
/// Pipeline must have empty header, it should not generate any chunk.
/// This is used for CreatingSets.
2020-09-21 08:36:12 +00:00
void addPipelineBefore(QueryPipeline pipeline);
2020-09-15 13:25:14 +00:00
void addCreatingSetsTransform(const Block & res_header, SubqueryForSet subquery_for_set, const SizeLimits & limits, ContextPtr context);
2020-09-16 16:11:16 +00:00
PipelineExecutorPtr execute();
2019-03-26 18:28:37 +00:00
2020-08-04 13:06:59 +00:00
size_t getNumStreams() const { return pipe.numOutputPorts(); }
2019-04-09 14:51:38 +00:00
2020-08-04 13:06:59 +00:00
bool hasTotals() const { return pipe.getTotalsPort() != nullptr; }
2019-03-26 18:28:37 +00:00
2020-08-04 13:06:59 +00:00
const Block & getHeader() const { return pipe.getHeader(); }
2019-03-26 18:28:37 +00:00
void addTableLock(TableLockHolder lock) { pipe.addTableLock(std::move(lock)); }
2020-08-04 13:06:59 +00:00
void addInterpreterContext(std::shared_ptr<Context> context) { pipe.addInterpreterContext(std::move(context)); }
void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); }
Fix QueryPlan lifetime (for EXPLAIN PIPELINE graph=1) for queries with nested interpreter Example of such queries are distributed queries, which creates local InterpreterSelectQuery, which will have it's own QueryPlan but returns Pipes that has that IQueryPlanStep attached. After EXPLAIN PIPELINE graph=1 tries to use them, and will get SIGSEGV. - TSAN: <details> ``` ==2782113==ERROR: AddressSanitizer: heap-use-after-free on address 0x6120000223c0 at pc 0x00002b8f3f3e bp 0x7fff18cfbff0 sp 0x7fff18cfbfe8 READ of size 8 at 0x6120000223c0 thread T22 (TCPHandler) #0 0x2b8f3f3d in DB::printPipelineCompact(std::__1::vector<std::__1::shared_ptr<DB::IProcessor>, std::__1::allocator<std::__1::shared_ptr<DB::IProcessor> > > const&, DB::WriteBuffer&, bool) /build/obj-x86_64-linux-gnu/../src/Processors/printPipeline.cpp:116:53 #1 0x29ee698c in DB::InterpreterExplainQuery::executeImpl() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:275:17 #2 0x29ee2e40 in DB::InterpreterExplainQuery::execute() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:73:14 #3 0x2a7b44a2 in DB::executeQueryImpl(char const*, char const*, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer*) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:389:28 #4 0x2a7b1cb3 in DB::executeQuery(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:675:30 #5 0x2b7993b2 in DB::TCPHandler::runImpl() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:253:24 #6 0x2b7b649a in DB::TCPHandler::run() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:1217:9 #7 0x31d9c57e in Poco::Net::TCPServerConnection::start() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerConnection.cpp:43:3 #8 0x31d9d281 in Poco::Net::TCPServerDispatcher::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerDispatcher.cpp:114:20 #9 0x3206b5d5 in Poco::PooledThread::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/ThreadPool.cpp:199:14 #10 0x320657ad in Poco::ThreadImpl::runnableEntry(void*) /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/Thread_POSIX.cpp:345:27 #11 0x7ffff7f853e8 in start_thread (/usr/lib/libpthread.so.0+0x93e8) #12 0x7ffff7ea2292 in clone (/usr/lib/libc.so.6+0x100292) 0x6120000223c0 is located 0 bytes inside of 272-byte region [0x6120000223c0,0x6120000224d0) freed by thread T22 (TCPHandler) here: #0 0x122f3b62 in operator delete(void*, unsigned long) (/src/ch/tmp/master-20200831/clickhouse+0x122f3b62) #1 0x2bd9e9fa in std::__1::default_delete<DB::IQueryPlanStep>::operator()(DB::IQueryPlanStep*) const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2363:5 #2 0x2bd9e9fa in std::__1::unique_ptr<DB::IQueryPlanStep, std::__1::default_delete<DB::IQueryPlanStep> >::reset(DB::IQueryPlanStep*) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2618:7 #3 0x2bd9e9fa in std::__1::unique_ptr<DB::IQueryPlanStep, std::__1::default_delete<DB::IQueryPlanStep> >::~unique_ptr() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2572:19 #4 0x2bd9e9fa in DB::QueryPlan::Node::~Node() /build/obj-x86_64-linux-gnu/../src/Processors/QueryPlan/QueryPlan.h:66:12 #5 0x2bd9e9fa in void std::__1::allocator_traits<std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> > >::__destroy<DB::QueryPlan::Node>(std::__1::integral_constant<bool, false>, std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> >&, DB::QueryPlan::Node*) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:1798:23 #6 0x2bd9e9fa in void std::__1::allocator_traits<std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> > >::destroy<DB::QueryPlan::Node>(std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> >&, DB::QueryPlan::Node*) /build/obj-x86_64-lin ux-gnu/../contrib/libcxx/include/memory:1630:14 #7 0x2bd9e9fa in std::__1::__list_imp<DB::QueryPlan::Node, std::__1::allocator<DB::QueryPlan::Node> >::clear() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/list:762:13 #8 0x29fece08 in DB::InterpreterSelectQuery::execute() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:492:1 #9 0x2abf7484 in DB::ClusterProxy::(anonymous namespace)::createLocalStream(std::__1::shared_ptr<DB::IAST> const&, DB::Block const&, DB::Context const&, DB::QueryProcessingStage::Enum) /build/obj-x86_64-linux-gnu/../src/Interpreters/ClusterProxy/SelectStreamFactory.cpp: 78:33 #10 0x2abea85d in DB::ClusterProxy::SelectStreamFactory::createForShard(DB::Cluster::ShardInfo const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<DB::IAST> const&, DB::Context const&, std::__1::shar ed_ptr<DB::Throttler> const&, DB::SelectQueryInfo const&, std::__1::vector<DB::Pipe, std::__1::allocator<DB::Pipe> >&)::$_0::operator()() const /build/obj-x86_64-linux-gnu/../src/Interpreters/ClusterProxy/SelectStreamFactory.cpp:133:51 #11 0x2abea85d in DB::ClusterProxy::SelectStreamFactory::createForShard(DB::Cluster::ShardInfo const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<DB::IAST> const&, DB::Context const&, std::__1::shar ed_ptr<DB::Throttler> const&, DB::SelectQueryInfo const&, std::__1::vector<DB::Pipe, std::__1::allocator<DB::Pipe> >&) /build/obj-x86_64-linux-gnu/../src/Interpreters/ClusterProxy/SelectStreamFactory.cpp:189:13 #12 0x2abe6d99 in DB::ClusterProxy::executeQuery(DB::ClusterProxy::IStreamFactory&, std::__1::shared_ptr<DB::Cluster> const&, Poco::Logger*, std::__1::shared_ptr<DB::IAST> const&, DB::Context const&, DB::Settings const&, DB::SelectQueryInfo const&) /build/obj-x86_64-lin ux-gnu/../src/Interpreters/ClusterProxy/executeQuery.cpp:107:24 #13 0x2abc4b74 in DB::StorageDistributed::read(std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, s td::__1::shared_ptr<DB::StorageInMemoryMetadata const> const&, DB::SelectQueryInfo const&, DB::Context const&, DB::QueryProcessingStage::Enum, unsigned long, unsigned int) /build/obj-x86_64-linux-gnu/../src/Storages/StorageDistributed.cpp:514:12 #14 0x2bda1c5a in DB::ReadFromStorageStep::ReadFromStorageStep(std::__1::shared_ptr<DB::RWLockImpl::LockHolderImpl>, std::__1::shared_ptr<DB::StorageInMemoryMetadata const>&, DB::SelectQueryOptions, std::__1::shared_ptr<DB::IStorage>, std::__1::vector<std::__1::basic_st ring<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, DB::SelectQueryInfo const&, std::__1::shared_ptr<DB::Context>, DB::QueryProcessingStage ::Enum, unsigned long, unsigned long) /build/obj-x86_64-linux-gnu/../src/Processors/QueryPlan/ReadFromStorageStep.cpp:39:26 #15 0x2a01ca70 in std::__1::__unique_if<DB::ReadFromStorageStep>::__unique_single std::__1::make_unique<DB::ReadFromStorageStep, std::__1::shared_ptr<DB::RWLockImpl::LockHolderImpl>&, std::__1::shared_ptr<DB::StorageInMemoryMetadata const>&, DB::SelectQueryOptions&, std ::__1::shared_ptr<DB::IStorage>&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > >&, DB::SelectQueryInfo&, st d::__1::shared_ptr<DB::Context>&, DB::QueryProcessingStage::Enum&, unsigned long&, unsigned long&>(std::__1::shared_ptr<DB::RWLockImpl::LockHolderImpl>&, std::__1::shared_ptr<DB::StorageInMemoryMetadata const>&, DB::SelectQueryOptions&, std::__1::shared_ptr<DB::IStorage>&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > >&, DB::SelectQueryInfo&, std::__1::shared_ptr<DB::Context>&, DB::QueryProcessingStage::Enum&, unsigned long&, unsigned long&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:3028:32 #16 0x29ff556a in DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum, DB::QueryPlan&, std::__1::shared_ptr<DB::PrewhereInfo> const&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std:: __1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:1383:26 #17 0x29fe6b83 in DB::InterpreterSelectQuery::executeImpl(DB::QueryPlan&, std::__1::shared_ptr<DB::IBlockInputStream> const&, std::__1::optional<DB::Pipe>) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:795:9 #18 0x29fe5771 in DB::InterpreterSelectQuery::buildQueryPlan(DB::QueryPlan&) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:473:5 #19 0x2a47d370 in DB::InterpreterSelectWithUnionQuery::buildQueryPlan(DB::QueryPlan&) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectWithUnionQuery.cpp:182:38 #20 0x29ee5bff in DB::InterpreterExplainQuery::executeImpl() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:265:21 #21 0x29ee2e40 in DB::InterpreterExplainQuery::execute() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:73:14 #22 0x2a7b44a2 in DB::executeQueryImpl(char const*, char const*, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer*) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:389:28 #23 0x2a7b1cb3 in DB::executeQuery(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:675:30 #24 0x2b7993b2 in DB::TCPHandler::runImpl() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:253:24 #25 0x2b7b649a in DB::TCPHandler::run() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:1217:9 #26 0x31d9c57e in Poco::Net::TCPServerConnection::start() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerConnection.cpp:43:3 #27 0x31d9d281 in Poco::Net::TCPServerDispatcher::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerDispatcher.cpp:114:20 #28 0x3206b5d5 in Poco::PooledThread::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/ThreadPool.cpp:199:14 #29 0x320657ad in Poco::ThreadImpl::runnableEntry(void*) /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/Thread_POSIX.cpp:345:27 #30 0x7ffff7f853e8 in start_thread (/usr/lib/libpthread.so.0+0x93e8) ``` </details>
2020-08-31 23:22:33 +00:00
void addQueryPlan(std::unique_ptr<QueryPlan> plan) { pipe.addQueryPlan(std::move(plan)); }
void setLimits(const StreamLocalLimits & limits) { pipe.setLimits(limits); }
void setLeafLimits(const SizeLimits & limits) { pipe.setLeafLimits(limits); }
2020-09-17 13:26:34 +00:00
void setQuota(const std::shared_ptr<const EnabledQuota> & quota) { pipe.setQuota(quota); }
2019-03-26 18:28:37 +00:00
/// For compatibility with IBlockInputStream.
void setProgressCallback(const ProgressCallback & callback);
void setProcessListElement(QueryStatus * elem);
/// Recommend number of threads for pipeline execution.
size_t getNumThreads() const
{
2020-08-04 13:06:59 +00:00
auto num_threads = pipe.maxParallelStreams();
if (max_threads)
num_threads = std::min(num_threads, max_threads);
return std::max<size_t>(1, num_threads);
}
/// Set upper limit for the recommend number of threads
2019-08-27 18:37:28 +00:00
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
/// Update upper limit for the recommend number of threads
void limitMaxThreads(size_t max_threads_)
{
if (max_threads == 0 || max_threads_ < max_threads)
max_threads = max_threads_;
}
2020-08-03 11:33:11 +00:00
/// Convert query pipeline to pipe.
2020-08-04 15:51:56 +00:00
static Pipe getPipe(QueryPipeline pipeline) { return std::move(pipeline.pipe); }
2019-03-26 18:28:37 +00:00
private:
2020-02-27 15:40:11 +00:00
2020-08-04 13:06:59 +00:00
Pipe pipe;
2019-04-08 14:55:20 +00:00
IOutputFormat * output_format = nullptr;
2019-03-26 18:28:37 +00:00
/// Limit on the number of threads. Zero means no limit.
/// Sometimes, more streams are created then the number of threads for more optimal execution.
2019-08-27 18:37:28 +00:00
size_t max_threads = 0;
QueryStatus * process_list_element = nullptr;
2019-03-26 18:28:37 +00:00
void checkInitialized();
2020-05-27 18:20:26 +00:00
void checkInitializedAndNotCompleted();
2019-04-09 10:17:25 +00:00
void initRowsBeforeLimit();
2020-06-25 09:39:17 +00:00
2020-08-04 15:51:56 +00:00
void setCollectedProcessors(Processors * processors);
2020-06-25 09:39:17 +00:00
friend class QueryPipelineProcessorsCollector;
};
/// This is a small class which collects newly added processors to QueryPipeline.
/// Pipeline must live longer than this class.
2020-06-25 09:39:17 +00:00
class QueryPipelineProcessorsCollector
{
public:
explicit QueryPipelineProcessorsCollector(QueryPipeline & pipeline_, IQueryPlanStep * step_ = nullptr);
~QueryPipelineProcessorsCollector();
Processors detachProcessors(size_t group = 0);
private:
QueryPipeline & pipeline;
IQueryPlanStep * step;
Processors processors;
2019-03-26 18:28:37 +00:00
};
}