ClickHouse/src/QueryPipeline/Pipe.h

133 lines
5.6 KiB
C++
Raw Normal View History

2019-11-05 17:33:03 +00:00
#pragma once
#include <Processors/IProcessor.h>
2021-10-16 14:03:50 +00:00
#include <QueryPipeline/PipelineResourcesHolder.h>
#include <QueryPipeline/Chain.h>
2021-10-15 20:18:20 +00:00
#include <QueryPipeline/SizeLimits.h>
namespace DB
{
class EnabledQuota;
2020-09-15 10:40:39 +00:00
struct StreamLocalLimits;
2020-09-14 14:13:58 +00:00
class Pipe;
2020-08-03 11:33:11 +00:00
using Pipes = std::vector<Pipe>;
class ReadProgressCallback;
2020-08-03 11:33:11 +00:00
using OutputPortRawPtrs = std::vector<OutputPort *>;
2020-08-14 01:10:10 +00:00
/// Pipe is a set of processors which represents the part of pipeline.
/// Pipe contains a list of output ports, with specified port for totals and specified port for extremes.
2020-07-31 16:54:54 +00:00
/// All output ports have same header.
/// All other ports are connected, all connections are inside processors set.
2020-08-03 11:33:11 +00:00
class Pipe
2020-07-31 16:54:54 +00:00
{
public:
2020-08-03 11:33:11 +00:00
/// Default constructor creates empty pipe. Generally, you cannot do anything with it except to check it is empty().
/// You cannot get empty pipe in any other way. All transforms check that result pipe is not empty.
Pipe() = default;
2020-07-31 16:54:54 +00:00
/// Create from source. Source must have no input ports and single output.
2020-08-03 11:33:11 +00:00
explicit Pipe(ProcessorPtr source);
2020-08-04 13:06:59 +00:00
/// Create from source with specified totals end extremes (may be nullptr). Ports should be owned by source.
explicit Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, OutputPort * extremes);
2020-07-31 16:54:54 +00:00
/// Create from processors. Use all not-connected output ports as output_ports. Check invariants.
2020-08-03 11:33:11 +00:00
explicit Pipe(Processors processors_);
2020-07-31 16:54:54 +00:00
2020-08-03 11:33:11 +00:00
Pipe(const Pipe & other) = delete;
Pipe(Pipe && other) = default;
Pipe & operator=(const Pipe & other) = delete;
Pipe & operator=(Pipe && other) = default;
2020-07-31 16:54:54 +00:00
const Block & getHeader() const { return header; }
2020-08-04 13:06:59 +00:00
bool empty() const { return processors.empty(); }
2020-08-03 11:33:11 +00:00
size_t numOutputPorts() const { return output_ports.size(); }
2020-08-04 13:06:59 +00:00
size_t maxParallelStreams() const { return max_parallel_streams; }
2020-07-31 16:54:54 +00:00
OutputPort * getOutputPort(size_t pos) const { return output_ports[pos]; }
OutputPort * getTotalsPort() const { return totals_port; }
OutputPort * getExtremesPort() const { return extremes_port; }
/// Add processor to list, add it output ports to output_ports.
/// Processor shouldn't have input ports, output ports shouldn't be connected.
/// Output headers should have same structure and be compatible with current header (if not empty()).
2020-08-04 13:06:59 +00:00
void addSource(ProcessorPtr source);
2020-07-31 16:54:54 +00:00
2020-08-03 11:33:11 +00:00
/// Add totals and extremes.
void addTotalsSource(ProcessorPtr source);
void addExtremesSource(ProcessorPtr source);
2020-08-04 15:51:56 +00:00
/// Drop totals and extremes (create NullSink for them).
void dropTotals();
void dropExtremes();
2020-07-31 16:54:54 +00:00
/// Add processor to list. It should have size() input ports with compatible header.
/// Output ports should have same headers.
/// If totals or extremes are not empty, transform shouldn't change header.
void addTransform(ProcessorPtr transform);
2020-08-04 15:51:56 +00:00
void addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes);
void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes);
2020-07-31 16:54:54 +00:00
enum class StreamType
{
Main = 0, /// Stream for query data. There may be several streams of this type.
2020-12-29 10:16:22 +00:00
Totals, /// Stream for totals. No more than one.
Extremes, /// Stream for extremes. No more than one.
2020-07-31 16:54:54 +00:00
};
2020-08-03 13:54:14 +00:00
using ProcessorGetter = std::function<ProcessorPtr(const Block & header)>;
using ProcessorGetterWithStreamKind = std::function<ProcessorPtr(const Block & header, StreamType stream_type)>;
2020-07-31 16:54:54 +00:00
/// Add transform with single input and single output for each port.
2020-08-06 12:24:05 +00:00
void addSimpleTransform(const ProcessorGetter & getter);
void addSimpleTransform(const ProcessorGetterWithStreamKind & getter);
2020-07-31 16:54:54 +00:00
/// Add chain to every output port.
void addChains(std::vector<Chain> chains);
2022-07-27 13:35:22 +00:00
/// Changes the number of output ports if needed. Adds (Strict)ResizeProcessor.
2020-10-12 09:30:05 +00:00
void resize(size_t num_streams, bool force = false, bool strict = false);
2020-08-03 11:33:11 +00:00
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform Pipe in general way.
void transform(const Transformer & transformer);
/// Unite several pipes together. They should have same header.
static Pipe unitePipes(Pipes pipes);
2020-07-31 16:54:54 +00:00
2020-08-04 13:06:59 +00:00
/// Get processors from Pipe. Use it with cautious, it is easy to loss totals and extremes ports.
static Processors detachProcessors(Pipe pipe) { return std::move(pipe.processors); }
2022-04-17 23:02:49 +00:00
/// Get processors from Pipe without destroying pipe (used for EXPLAIN to keep QueryPlan).
Fix QueryPlan lifetime (for EXPLAIN PIPELINE graph=1) for queries with nested interpreter Example of such queries are distributed queries, which creates local InterpreterSelectQuery, which will have it's own QueryPlan but returns Pipes that has that IQueryPlanStep attached. After EXPLAIN PIPELINE graph=1 tries to use them, and will get SIGSEGV. - TSAN: <details> ``` ==2782113==ERROR: AddressSanitizer: heap-use-after-free on address 0x6120000223c0 at pc 0x00002b8f3f3e bp 0x7fff18cfbff0 sp 0x7fff18cfbfe8 READ of size 8 at 0x6120000223c0 thread T22 (TCPHandler) #0 0x2b8f3f3d in DB::printPipelineCompact(std::__1::vector<std::__1::shared_ptr<DB::IProcessor>, std::__1::allocator<std::__1::shared_ptr<DB::IProcessor> > > const&, DB::WriteBuffer&, bool) /build/obj-x86_64-linux-gnu/../src/Processors/printPipeline.cpp:116:53 #1 0x29ee698c in DB::InterpreterExplainQuery::executeImpl() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:275:17 #2 0x29ee2e40 in DB::InterpreterExplainQuery::execute() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:73:14 #3 0x2a7b44a2 in DB::executeQueryImpl(char const*, char const*, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer*) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:389:28 #4 0x2a7b1cb3 in DB::executeQuery(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:675:30 #5 0x2b7993b2 in DB::TCPHandler::runImpl() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:253:24 #6 0x2b7b649a in DB::TCPHandler::run() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:1217:9 #7 0x31d9c57e in Poco::Net::TCPServerConnection::start() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerConnection.cpp:43:3 #8 0x31d9d281 in Poco::Net::TCPServerDispatcher::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerDispatcher.cpp:114:20 #9 0x3206b5d5 in Poco::PooledThread::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/ThreadPool.cpp:199:14 #10 0x320657ad in Poco::ThreadImpl::runnableEntry(void*) /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/Thread_POSIX.cpp:345:27 #11 0x7ffff7f853e8 in start_thread (/usr/lib/libpthread.so.0+0x93e8) #12 0x7ffff7ea2292 in clone (/usr/lib/libc.so.6+0x100292) 0x6120000223c0 is located 0 bytes inside of 272-byte region [0x6120000223c0,0x6120000224d0) freed by thread T22 (TCPHandler) here: #0 0x122f3b62 in operator delete(void*, unsigned long) (/src/ch/tmp/master-20200831/clickhouse+0x122f3b62) #1 0x2bd9e9fa in std::__1::default_delete<DB::IQueryPlanStep>::operator()(DB::IQueryPlanStep*) const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2363:5 #2 0x2bd9e9fa in std::__1::unique_ptr<DB::IQueryPlanStep, std::__1::default_delete<DB::IQueryPlanStep> >::reset(DB::IQueryPlanStep*) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2618:7 #3 0x2bd9e9fa in std::__1::unique_ptr<DB::IQueryPlanStep, std::__1::default_delete<DB::IQueryPlanStep> >::~unique_ptr() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2572:19 #4 0x2bd9e9fa in DB::QueryPlan::Node::~Node() /build/obj-x86_64-linux-gnu/../src/Processors/QueryPlan/QueryPlan.h:66:12 #5 0x2bd9e9fa in void std::__1::allocator_traits<std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> > >::__destroy<DB::QueryPlan::Node>(std::__1::integral_constant<bool, false>, std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> >&, DB::QueryPlan::Node*) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:1798:23 #6 0x2bd9e9fa in void std::__1::allocator_traits<std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> > >::destroy<DB::QueryPlan::Node>(std::__1::allocator<std::__1::__list_node<DB::QueryPlan::Node, void*> >&, DB::QueryPlan::Node*) /build/obj-x86_64-lin ux-gnu/../contrib/libcxx/include/memory:1630:14 #7 0x2bd9e9fa in std::__1::__list_imp<DB::QueryPlan::Node, std::__1::allocator<DB::QueryPlan::Node> >::clear() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/list:762:13 #8 0x29fece08 in DB::InterpreterSelectQuery::execute() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:492:1 #9 0x2abf7484 in DB::ClusterProxy::(anonymous namespace)::createLocalStream(std::__1::shared_ptr<DB::IAST> const&, DB::Block const&, DB::Context const&, DB::QueryProcessingStage::Enum) /build/obj-x86_64-linux-gnu/../src/Interpreters/ClusterProxy/SelectStreamFactory.cpp: 78:33 #10 0x2abea85d in DB::ClusterProxy::SelectStreamFactory::createForShard(DB::Cluster::ShardInfo const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<DB::IAST> const&, DB::Context const&, std::__1::shar ed_ptr<DB::Throttler> const&, DB::SelectQueryInfo const&, std::__1::vector<DB::Pipe, std::__1::allocator<DB::Pipe> >&)::$_0::operator()() const /build/obj-x86_64-linux-gnu/../src/Interpreters/ClusterProxy/SelectStreamFactory.cpp:133:51 #11 0x2abea85d in DB::ClusterProxy::SelectStreamFactory::createForShard(DB::Cluster::ShardInfo const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<DB::IAST> const&, DB::Context const&, std::__1::shar ed_ptr<DB::Throttler> const&, DB::SelectQueryInfo const&, std::__1::vector<DB::Pipe, std::__1::allocator<DB::Pipe> >&) /build/obj-x86_64-linux-gnu/../src/Interpreters/ClusterProxy/SelectStreamFactory.cpp:189:13 #12 0x2abe6d99 in DB::ClusterProxy::executeQuery(DB::ClusterProxy::IStreamFactory&, std::__1::shared_ptr<DB::Cluster> const&, Poco::Logger*, std::__1::shared_ptr<DB::IAST> const&, DB::Context const&, DB::Settings const&, DB::SelectQueryInfo const&) /build/obj-x86_64-lin ux-gnu/../src/Interpreters/ClusterProxy/executeQuery.cpp:107:24 #13 0x2abc4b74 in DB::StorageDistributed::read(std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, s td::__1::shared_ptr<DB::StorageInMemoryMetadata const> const&, DB::SelectQueryInfo const&, DB::Context const&, DB::QueryProcessingStage::Enum, unsigned long, unsigned int) /build/obj-x86_64-linux-gnu/../src/Storages/StorageDistributed.cpp:514:12 #14 0x2bda1c5a in DB::ReadFromStorageStep::ReadFromStorageStep(std::__1::shared_ptr<DB::RWLockImpl::LockHolderImpl>, std::__1::shared_ptr<DB::StorageInMemoryMetadata const>&, DB::SelectQueryOptions, std::__1::shared_ptr<DB::IStorage>, std::__1::vector<std::__1::basic_st ring<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, DB::SelectQueryInfo const&, std::__1::shared_ptr<DB::Context>, DB::QueryProcessingStage ::Enum, unsigned long, unsigned long) /build/obj-x86_64-linux-gnu/../src/Processors/QueryPlan/ReadFromStorageStep.cpp:39:26 #15 0x2a01ca70 in std::__1::__unique_if<DB::ReadFromStorageStep>::__unique_single std::__1::make_unique<DB::ReadFromStorageStep, std::__1::shared_ptr<DB::RWLockImpl::LockHolderImpl>&, std::__1::shared_ptr<DB::StorageInMemoryMetadata const>&, DB::SelectQueryOptions&, std ::__1::shared_ptr<DB::IStorage>&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > >&, DB::SelectQueryInfo&, st d::__1::shared_ptr<DB::Context>&, DB::QueryProcessingStage::Enum&, unsigned long&, unsigned long&>(std::__1::shared_ptr<DB::RWLockImpl::LockHolderImpl>&, std::__1::shared_ptr<DB::StorageInMemoryMetadata const>&, DB::SelectQueryOptions&, std::__1::shared_ptr<DB::IStorage>&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > >&, DB::SelectQueryInfo&, std::__1::shared_ptr<DB::Context>&, DB::QueryProcessingStage::Enum&, unsigned long&, unsigned long&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:3028:32 #16 0x29ff556a in DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum, DB::QueryPlan&, std::__1::shared_ptr<DB::PrewhereInfo> const&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std:: __1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:1383:26 #17 0x29fe6b83 in DB::InterpreterSelectQuery::executeImpl(DB::QueryPlan&, std::__1::shared_ptr<DB::IBlockInputStream> const&, std::__1::optional<DB::Pipe>) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:795:9 #18 0x29fe5771 in DB::InterpreterSelectQuery::buildQueryPlan(DB::QueryPlan&) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectQuery.cpp:473:5 #19 0x2a47d370 in DB::InterpreterSelectWithUnionQuery::buildQueryPlan(DB::QueryPlan&) /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterSelectWithUnionQuery.cpp:182:38 #20 0x29ee5bff in DB::InterpreterExplainQuery::executeImpl() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:265:21 #21 0x29ee2e40 in DB::InterpreterExplainQuery::execute() /build/obj-x86_64-linux-gnu/../src/Interpreters/InterpreterExplainQuery.cpp:73:14 #22 0x2a7b44a2 in DB::executeQueryImpl(char const*, char const*, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer*) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:389:28 #23 0x2a7b1cb3 in DB::executeQuery(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool) /build/obj-x86_64-linux-gnu/../src/Interpreters/executeQuery.cpp:675:30 #24 0x2b7993b2 in DB::TCPHandler::runImpl() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:253:24 #25 0x2b7b649a in DB::TCPHandler::run() /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:1217:9 #26 0x31d9c57e in Poco::Net::TCPServerConnection::start() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerConnection.cpp:43:3 #27 0x31d9d281 in Poco::Net::TCPServerDispatcher::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerDispatcher.cpp:114:20 #28 0x3206b5d5 in Poco::PooledThread::run() /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/ThreadPool.cpp:199:14 #29 0x320657ad in Poco::ThreadImpl::runnableEntry(void*) /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/Thread_POSIX.cpp:345:27 #30 0x7ffff7f853e8 in start_thread (/usr/lib/libpthread.so.0+0x93e8) ``` </details>
2020-08-31 23:22:33 +00:00
const Processors & getProcessors() const { return processors; }
2020-08-04 13:06:59 +00:00
2020-07-31 16:54:54 +00:00
private:
/// Header is common for all output below.
Block header;
2020-08-04 13:06:59 +00:00
Processors processors;
2020-07-31 16:54:54 +00:00
/// Output ports. Totals and extremes are allowed to be empty.
2020-08-03 11:33:11 +00:00
OutputPortRawPtrs output_ports;
2020-07-31 16:54:54 +00:00
OutputPort * totals_port = nullptr;
OutputPort * extremes_port = nullptr;
2020-08-04 13:06:59 +00:00
/// It is the max number of processors which can be executed in parallel for each step.
2020-07-31 16:54:54 +00:00
/// Usually, it's the same as the number of output ports.
size_t max_parallel_streams = 0;
2020-08-03 15:54:53 +00:00
/// If is set, all newly created processors will be added to this too.
/// It is needed for debug. See QueryPipelineProcessorsCollector.
Processors * collected_processors = nullptr;
2020-08-04 13:06:59 +00:00
/// This methods are for QueryPipeline. It is allowed to complete graph only there.
/// So, we may be sure that Pipe always has output port if not empty.
bool isCompleted() const { return !empty() && output_ports.empty(); }
static Pipe unitePipes(Pipes pipes, Processors * collected_processors, bool allow_empty_header);
2020-08-04 13:06:59 +00:00
void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter);
2020-08-03 15:54:53 +00:00
friend class QueryPipelineBuilder;
2021-09-15 19:35:48 +00:00
friend class QueryPipeline;
2020-08-03 11:33:11 +00:00
};
2020-07-31 16:54:54 +00:00
}