ClickHouse/dbms/src/Processors/QueryPipeline.h
Nikolai Kochetov 8ce6040306
Merge pull request #7796 from ClickHouse/processors-4.3
Processors and system.numbers
2020-01-29 13:11:13 +03:00

138 lines
4.7 KiB
C++

#pragma once
#include <Processors/IProcessor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Pipe.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/IStorage_fwd.h>
namespace DB
{
class TableStructureReadLock;
using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
using TableStructureReadLocks = std::vector<TableStructureReadLockHolder>;
class Context;
class IOutputFormat;
class QueryPipeline
{
public:
QueryPipeline() = default;
/// All pipes must have same header.
void init(Pipes pipes);
void init(Pipe pipe); /// Simple init for single pipe
bool initialized() { return !processors.empty(); }
/// Type of logical data stream for simple transform.
/// Sometimes it's important to know which part of pipeline we are working for.
/// Example: ExpressionTransform need special logic for totals.
enum class StreamType
{
Main = 0, /// Stream for query data. There may be several streams of this type.
Totals, /// Stream for totals. No more then one.
Extremes, /// Stream for extremes. No more then one.
};
using ProcessorGetter = std::function<ProcessorPtr(const Block & header)>;
using ProcessorGetterWithStreamKind = std::function<ProcessorPtr(const Block & header, StreamType stream_type)>;
void addSimpleTransform(const ProcessorGetter & getter);
void addSimpleTransform(const ProcessorGetterWithStreamKind & getter);
void addPipe(Processors pipe);
void addTotalsHavingTransform(ProcessorPtr transform);
void addExtremesTransform(ProcessorPtr transform);
void addCreatingSetsTransform(ProcessorPtr transform);
void setOutput(ProcessorPtr output);
/// Add totals which returns one chunk with single row with defaults.
void addDefaultTotals();
/// Add already calculated totals.
void addTotals(ProcessorPtr source);
void dropTotalsIfHas();
/// Will read from this stream after all data was read from other streams.
void addDelayedStream(ProcessorPtr source);
/// Check if resize transform was used. (In that case another distinct transform will be added).
bool hasMixedStreams() const { return has_resize || hasMoreThanOneStream(); }
void resize(size_t num_streams, bool force = false, bool strict = false);
void enableQuotaForCurrentStreams();
void unitePipelines(std::vector<QueryPipeline> && pipelines, const Block & common_header, const Context & context);
PipelineExecutorPtr execute();
size_t getNumStreams() const { return streams.size(); }
bool hasMoreThanOneStream() const { return getNumStreams() > 1; }
bool hasTotals() const { return totals_having_port != nullptr; }
const Block & getHeader() const { return current_header; }
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
void addInterpreterContext(std::shared_ptr<Context> context) { interpreter_context.emplace_back(std::move(context)); }
void addStorageHolder(StoragePtr storage) { storage_holder.emplace_back(std::move(storage)); }
/// For compatibility with IBlockInputStream.
void setProgressCallback(const ProgressCallback & callback);
void setProcessListElement(QueryStatus * elem);
/// Call after execution.
void finalize();
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
size_t getMaxThreads() const { return max_threads; }
private:
/// All added processors.
Processors processors;
/// Port for each independent "stream".
std::vector<OutputPort *> streams;
/// Special ports for extremes and totals having.
OutputPort * totals_having_port = nullptr;
OutputPort * extremes_port = nullptr;
/// If resize processor was added to pipeline.
bool has_resize = false;
/// Common header for each stream.
Block current_header;
TableStructureReadLocks table_locks;
/// Some Streams (or Processors) may implicitly use Context or temporary Storage created by Interpreter.
/// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here,
/// because QueryPipeline is alive until query is finished.
std::vector<std::shared_ptr<Context>> interpreter_context;
std::vector<StoragePtr> storage_holder;
IOutputFormat * output_format = nullptr;
size_t max_threads = 0;
QueryStatus * process_list_element = nullptr;
void checkInitialized();
void checkSource(const ProcessorPtr & source, bool can_have_totals);
template <typename TProcessorGetter>
void addSimpleTransformImpl(const TProcessorGetter & getter);
void calcRowsBeforeLimit();
};
}