ClickHouse/src/Processors/QueryPipeline.h

204 lines
7.3 KiB
C++
Raw Normal View History

2019-03-26 18:28:37 +00:00
#pragma once
#include <Processors/IProcessor.h>
2019-04-05 10:52:07 +00:00
#include <Processors/Executors/PipelineExecutor.h>
2019-11-05 17:33:03 +00:00
#include <Processors/Pipe.h>
2019-04-05 10:52:07 +00:00
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/IStorage_fwd.h>
2019-03-26 18:28:37 +00:00
namespace DB
{
class TableStructureReadLock;
using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
2019-11-05 17:33:03 +00:00
using TableStructureReadLocks = std::vector<TableStructureReadLockHolder>;
2019-03-26 18:28:37 +00:00
class Context;
2019-04-08 14:55:20 +00:00
class IOutputFormat;
2019-03-26 18:28:37 +00:00
class QueryPipeline
{
private:
/// It's a wrapper over std::vector<OutputPort *>
/// Is needed to support invariant for max_parallel_streams (see comment below).
class Streams
{
public:
auto size() const { return data.size(); }
auto begin() { return data.begin(); }
auto end() { return data.end(); }
auto & front() { return data.front(); }
auto & back() { return data.back(); }
auto & at(size_t pos) { return data.at(pos); }
auto & operator[](size_t pos) { return data[pos]; }
void clear() { data.clear(); }
void reserve(size_t size_) { data.reserve(size_); }
void addStream(OutputPort * port, size_t port_max_parallel_streams)
{
data.push_back(port);
max_parallel_streams = std::max<size_t>(max_parallel_streams + port_max_parallel_streams, data.size());
}
void addStreams(Streams & other)
{
data.insert(data.end(), other.begin(), other.end());
max_parallel_streams = std::max<size_t>(max_parallel_streams + other.max_parallel_streams, data.size());
}
void assign(std::initializer_list<OutputPort *> list)
{
data = list;
max_parallel_streams = std::max<size_t>(max_parallel_streams, data.size());
}
size_t maxParallelStreams() const { return max_parallel_streams; }
private:
std::vector<OutputPort *> data;
/// It is the max number of processors which can be executed in parallel for each step.
/// Logically, it is the upper limit on the number of threads needed to execute this pipeline.
/// Initially, it is the number of sources. It may be increased after resize, aggregation, etc.
/// This number is never decreased, and it is calculated as max(streams.size()) over all streams while building.
size_t max_parallel_streams = 0;
};
2019-03-26 18:28:37 +00:00
public:
QueryPipeline() = default;
2020-02-27 15:40:11 +00:00
QueryPipeline(QueryPipeline &&) = default;
~QueryPipeline() = default;
QueryPipeline(const QueryPipeline &) = delete;
QueryPipeline & operator= (const QueryPipeline & rhs) = delete;
QueryPipeline & operator= (QueryPipeline && rhs);
2019-03-26 18:28:37 +00:00
2019-11-05 17:33:03 +00:00
/// All pipes must have same header.
void init(Pipes pipes);
void init(Pipe pipe); /// Simple init for single pipe
2019-03-26 18:28:37 +00:00
bool initialized() { return !processors.empty(); }
2020-01-24 17:04:41 +00:00
/// Type of logical data stream for simple transform.
2020-01-24 17:35:17 +00:00
/// Sometimes it's important to know which part of pipeline we are working for.
/// Example: ExpressionTransform need special logic for totals.
2019-04-09 10:17:25 +00:00
enum class StreamType
{
2020-01-24 17:04:41 +00:00
Main = 0, /// Stream for query data. There may be several streams of this type.
Totals, /// Stream for totals. No more then one.
Extremes, /// Stream for extremes. No more then one.
2019-04-09 10:17:25 +00:00
};
2019-03-26 18:28:37 +00:00
using ProcessorGetter = std::function<ProcessorPtr(const Block & header)>;
2019-04-09 10:17:25 +00:00
using ProcessorGetterWithStreamKind = std::function<ProcessorPtr(const Block & header, StreamType stream_type)>;
2019-03-26 18:28:37 +00:00
2019-04-05 10:52:07 +00:00
void addSimpleTransform(const ProcessorGetter & getter);
2019-04-09 10:17:25 +00:00
void addSimpleTransform(const ProcessorGetterWithStreamKind & getter);
2019-03-26 18:28:37 +00:00
void addPipe(Processors pipe);
void addTotalsHavingTransform(ProcessorPtr transform);
2020-04-08 12:40:04 +00:00
void addExtremesTransform();
2019-03-26 18:28:37 +00:00
void addCreatingSetsTransform(ProcessorPtr transform);
void setOutput(ProcessorPtr output);
2019-04-09 14:51:38 +00:00
/// Add totals which returns one chunk with single row with defaults.
void addDefaultTotals();
2019-04-10 16:28:37 +00:00
/// Add already calculated totals.
void addTotals(ProcessorPtr source);
2019-04-17 15:35:22 +00:00
void dropTotalsIfHas();
2019-03-26 18:28:37 +00:00
/// Will read from this stream after all data was read from other streams.
void addDelayedStream(ProcessorPtr source);
2019-05-14 11:04:11 +00:00
/// Check if resize transform was used. (In that case another distinct transform will be added).
2019-05-14 13:13:12 +00:00
bool hasMixedStreams() const { return has_resize || hasMoreThanOneStream(); }
2019-03-26 18:28:37 +00:00
2020-01-13 12:04:02 +00:00
void resize(size_t num_streams, bool force = false, bool strict = false);
2019-03-26 18:28:37 +00:00
2019-12-26 16:52:15 +00:00
void enableQuotaForCurrentStreams();
2020-04-14 21:05:45 +00:00
void unitePipelines(std::vector<QueryPipeline> && pipelines, const Block & common_header);
2019-03-26 18:28:37 +00:00
PipelineExecutorPtr execute();
2019-03-26 18:28:37 +00:00
size_t getNumStreams() const { return streams.size(); }
2019-04-09 14:51:38 +00:00
2019-04-03 11:21:38 +00:00
bool hasMoreThanOneStream() const { return getNumStreams() > 1; }
2019-04-09 14:51:38 +00:00
bool hasTotals() const { return totals_having_port != nullptr; }
2019-03-26 18:28:37 +00:00
const Block & getHeader() const { return current_header; }
2019-11-05 17:33:03 +00:00
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
void addInterpreterContext(std::shared_ptr<Context> context) { interpreter_context.emplace_back(std::move(context)); }
void addStorageHolder(StoragePtr storage) { storage_holders.emplace_back(std::move(storage)); }
2019-03-26 18:28:37 +00:00
/// For compatibility with IBlockInputStream.
void setProgressCallback(const ProgressCallback & callback);
void setProcessListElement(QueryStatus * elem);
/// Recommend number of threads for pipeline execution.
size_t getNumThreads() const
{
auto num_threads = streams.maxParallelStreams();
if (max_threads)
num_threads = std::min(num_threads, max_threads);
return std::max<size_t>(1, num_threads);
}
/// Set upper limit for the recommend number of threads
2019-08-27 18:37:28 +00:00
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
/// Convert query pipeline to single pipe.
Pipe getPipe() &&;
2019-03-26 18:28:37 +00:00
private:
2020-02-27 15:40:11 +00:00
/// Destruction order: processors, header, locks, temporary storages, local contexts
/// Some Streams (or Processors) may implicitly use Context or temporary Storage created by Interpreter.
/// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here,
/// because QueryPipeline is alive until query is finished.
std::vector<std::shared_ptr<Context>> interpreter_context;
std::vector<StoragePtr> storage_holders;
TableStructureReadLocks table_locks;
/// Common header for each stream.
Block current_header;
2019-03-26 18:28:37 +00:00
/// All added processors.
Processors processors;
/// Port for each independent "stream".
Streams streams;
2019-03-26 18:28:37 +00:00
/// Special ports for extremes and totals having.
OutputPort * totals_having_port = nullptr;
OutputPort * extremes_port = nullptr;
2019-05-14 11:04:11 +00:00
/// If resize processor was added to pipeline.
2019-05-14 13:13:12 +00:00
bool has_resize = false;
2019-05-14 11:04:11 +00:00
2019-04-08 14:55:20 +00:00
IOutputFormat * output_format = nullptr;
2019-03-26 18:28:37 +00:00
/// Limit on the number of threads. Zero means no limit.
/// Sometimes, more streams are created then the number of threads for more optimal execution.
2019-08-27 18:37:28 +00:00
size_t max_threads = 0;
QueryStatus * process_list_element = nullptr;
2019-03-26 18:28:37 +00:00
void checkInitialized();
2020-03-18 00:57:00 +00:00
static void checkSource(const ProcessorPtr & source, bool can_have_totals);
2019-04-08 14:55:20 +00:00
2019-04-09 10:17:25 +00:00
template <typename TProcessorGetter>
void addSimpleTransformImpl(const TProcessorGetter & getter);
void initRowsBeforeLimit();
2019-03-26 18:28:37 +00:00
};
}