Less wrappers for procesors pipeline.

This commit is contained in:
Nikolai Kochetov 2019-11-05 20:33:03 +03:00
parent a0e9f9fdcf
commit 055f21c9b7
9 changed files with 143 additions and 40 deletions

View File

@ -92,6 +92,7 @@
#include <Processors/Transforms/FinishSortingTransform.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataStreams/materializeBlock.h>
#include <Processors/Pipe.h>
namespace DB
@ -955,7 +956,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (options.only_analyze)
{
if constexpr (pipeline_with_processors)
pipeline.init({std::make_shared<NullSource>(source_header)});
pipeline.init(Pipe(std::make_shared<NullSource>(source_header)));
else
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(source_header));
@ -999,7 +1000,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (prepared_input)
{
if constexpr (pipeline_with_processors)
pipeline.init({std::make_shared<SourceFromInputStream>(prepared_input)});
pipeline.init(Pipe(std::make_shared<SourceFromInputStream>(prepared_input)));
else
pipeline.streams.push_back(prepared_input);
}
@ -1322,7 +1323,7 @@ void InterpreterSelectQuery::executeFetchColumns(
auto istream = std::make_shared<OneBlockInputStream>(block_with_count);
if constexpr (pipeline_with_processors)
pipeline.init({std::make_shared<SourceFromInputStream>(istream)});
pipeline.init(Pipe(std::make_shared<SourceFromInputStream>(istream)));
else
pipeline.streams.emplace_back(istream);
from_stage = QueryProcessingStage::WithMergeableState;
@ -1587,9 +1588,19 @@ void InterpreterSelectQuery::executeFetchColumns(
query_info.prewhere_info = prewhere_info;
query_info.sorting_info = sorting_info;
auto streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams);
BlockInputStreams streams;
Pipes pipes;
if (streams.empty())
/// Will work with pipes directly if storage support processors.
/// Code is temporarily copy-pasted while moving to new pipeline.
bool use_pipes = pipeline_with_processors && storage->supportProcessorsPipeline();
if (use_pipes)
pipes = storage->readWithProcessors(required_columns, query_info, context, processing_stage, max_block_size, max_streams);
else
streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams);
if (streams.empty() && !use_pipes)
{
streams = {std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns))};
@ -1612,9 +1623,34 @@ void InterpreterSelectQuery::executeFetchColumns(
}
}
/// Copy-paste from prev if.
if (pipes.empty() && use_pipes)
{
Pipe pipe(std::make_shared<NullSource>(storage->getSampleBlockForColumns(required_columns)));
if (query_info.prewhere_info)
{
pipe.addSimpleTransform(std::make_shared<FilterTransform>(
pipe.getHeader(),
prewhere_info->prewhere_actions,
prewhere_info->prewhere_column_name,
prewhere_info->remove_prewhere_column));
if (query_info.prewhere_info->remove_columns_actions)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), query_info.prewhere_info->remove_columns_actions));
}
}
for (auto & stream : streams)
stream->addTableLock(table_lock);
if constexpr (pipeline_with_processors)
{
/// Table lock is stored inside pipeline here.
if (use_pipes)
pipeline.addTableLock(table_lock);
}
/// Set the limits and quota for reading data, the speed and time of the query.
{
IBlockInputStream::LocalLimits limits;
@ -1649,11 +1685,21 @@ void InterpreterSelectQuery::executeFetchColumns(
if (options.to_stage == QueryProcessingStage::Complete)
stream->setQuota(quota);
}
/// Copy-paste
for (auto & pipe : pipes)
{
if (!options.ignore_limits)
pipe.setLimits(limits);
if (options.to_stage == QueryProcessingStage::Complete)
pipe.setQuota(quota);
}
}
if constexpr (pipeline_with_processors)
{
if (streams.size() == 1)
if (streams.size() == 1 || pipes.size() == 1)
pipeline.setMaxThreads(streams.size());
/// Unify streams. They must have same headers.
@ -1665,9 +1711,8 @@ void InterpreterSelectQuery::executeFetchColumns(
if (first_header.columns() > 1 && first_header.has("_dummy"))
first_header.erase("_dummy");
for (size_t i = 0; i < streams.size(); ++i)
for (auto & stream : streams)
{
auto & stream = streams[i];
auto header = stream->getHeader();
auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name;
if (!blocksHaveEqualStructure(first_header, header))
@ -1675,12 +1720,6 @@ void InterpreterSelectQuery::executeFetchColumns(
}
}
Processors sources;
sources.reserve(streams.size());
/// Pin sources for merge tree tables.
bool pin_sources = dynamic_cast<const MergeTreeData *>(storage.get()) != nullptr;
for (auto & stream : streams)
{
bool force_add_agg_info = processing_stage == QueryProcessingStage::WithMergeableState;
@ -1689,13 +1728,18 @@ void InterpreterSelectQuery::executeFetchColumns(
if (processing_stage == QueryProcessingStage::Complete)
source->addTotalsPort();
if (pin_sources)
source->setStream(sources.size());
sources.emplace_back(std::move(source));
pipes.emplace_back(std::move(source));
}
pipeline.init(std::move(sources));
/// Pin sources for merge tree tables.
bool pin_sources = dynamic_cast<const MergeTreeData *>(storage.get()) != nullptr;
if (pin_sources)
{
for (size_t i = 0; i < pipes.size(); ++i)
pipes[i].pinSources(i);
}
pipeline.init(std::move(pipes));
}
else
pipeline.streams = std::move(streams);

View File

@ -14,6 +14,7 @@
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Pipe.h>
namespace DB
@ -236,7 +237,7 @@ QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
}
if (!has_main_pipeline)
main_pipeline.init({ std::make_shared<NullSource>(getSampleBlock()) });
main_pipeline.init(Pipe(std::make_shared<NullSource>(getSampleBlock())));
if (!pipelines.empty())
{

View File

@ -44,8 +44,8 @@ static void checkSource(const IProcessor & source)
throw Exception("Source for pipe should have single output, but it doesn't have any",
ErrorCodes::LOGICAL_ERROR);
if (source.getOutputs().size() != 1)
throw Exception("Source for pipe should have single output, but " + source.getName() + " has " +
if (source.getOutputs().size() > 2)
throw Exception("Source for pipe should have single or two outputs, but " + source.getName() + " has " +
toString(source.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
}
@ -54,6 +54,10 @@ Pipe::Pipe(ProcessorPtr source)
{
checkSource(*source);
output_port = &source->getOutputs().front();
if (source->getOutputs().size() > 1)
totals = &source->getOutputs().back();
processors.emplace_back(std::move(source));
}
@ -84,4 +88,31 @@ void Pipe::addSimpleTransform(ProcessorPtr transform)
processors.emplace_back(std::move(transform));
}
void Pipe::setLimits(const ISourceWithProgress::LocalLimits & limits)
{
for (auto & processor : processors)
{
if (auto * source_with_progress = typeid_cast<ISourceWithProgress *>(processor.get()))
source_with_progress->setLimits(limits);
}
}
void Pipe::setQuota(QuotaForIntervals & quota)
{
for (auto & processor : processors)
{
if (auto * source_with_progress = typeid_cast<ISourceWithProgress *>(processor.get()))
source_with_progress->setQuota(quota);
}
}
void Pipe::pinSources(size_t executor_number)
{
for (auto & processor : processors)
{
if (auto * source = typeid_cast<ISource *>(processor.get()))
source->setStream(executor_number);
}
}
}

View File

@ -1,4 +1,6 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/Sources/SourceWithProgress.h>
namespace DB
{
@ -6,6 +8,8 @@ namespace DB
class Pipe;
using Pipes = std::vector<Pipe>;
class QuotaForIntervals;
/// Pipe is a set of processors which represents the part of pipeline with single output.
/// All processors in pipe are connected. All ports are connected except the output one.
class Pipe
@ -33,9 +37,20 @@ public:
Processors detachProcessors() && { return std::move(processors); }
/// Specify quotas and limits for every ISourceWithProgress.
void setLimits(const SourceWithProgress::LocalLimits & limits);
void setQuota(QuotaForIntervals & quota);
/// Set information about preferred executor number for sources.
void pinSources(size_t executor_number);
void setTotalsPort(OutputPort * totals_) { totals = totals_; }
OutputPort * getTotalsPort() const { return totals; }
private:
Processors processors;
OutputPort * output_port = nullptr;
OutputPort * totals = nullptr;
};
}

View File

@ -14,7 +14,6 @@
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Common/typeid_cast.h>
@ -48,36 +47,41 @@ void QueryPipeline::checkSource(const ProcessorPtr & source, bool can_have_total
toString(source->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
}
void QueryPipeline::init(Processors sources)
void QueryPipeline::init(Pipe pipe)
{
Pipes pipes;
pipes.emplace_back(std::move(pipe));
init(std::move(pipes));
}
void QueryPipeline::init(Pipes pipes)
{
if (initialized())
throw Exception("Pipeline has already been initialized.", ErrorCodes::LOGICAL_ERROR);
if (sources.empty())
throw Exception("Can't initialize pipeline with empty source list.", ErrorCodes::LOGICAL_ERROR);
if (pipes.empty())
throw Exception("Can't initialize pipeline with empty pipes list.", ErrorCodes::LOGICAL_ERROR);
std::vector<OutputPort *> totals;
for (auto & source : sources)
for (auto & pipe : pipes)
{
checkSource(source, true);
auto & header = source->getOutputs().front().getHeader();
auto & header = pipe.getHeader();
if (current_header)
assertBlocksHaveEqualStructure(current_header, header, "QueryPipeline");
else
current_header = header;
if (source->getOutputs().size() > 1)
if (auto * totals_port = pipe.getTotalsPort())
{
assertBlocksHaveEqualStructure(current_header, source->getOutputs().back().getHeader(), "QueryPipeline");
totals.emplace_back(&source->getOutputs().back());
assertBlocksHaveEqualStructure(current_header, totals_port->getHeader(), "QueryPipeline");
totals.emplace_back(totals_port);
}
/// source->setStream(streams.size());
streams.emplace_back(&source->getOutputs().front());
processors.emplace_back(std::move(source));
streams.emplace_back(&pipe.getPort());
auto cur_processors = std::move(pipe).detachProcessors();
processors.insert(processors.end(), cur_processors.begin(), cur_processors.end());
}
if (!totals.empty())

View File

@ -1,6 +1,7 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Pipe.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
@ -11,7 +12,7 @@ namespace DB
class TableStructureReadLock;
using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
using TableStructureReadLocks = std::vector<TableStructureReadLockPtr>;
using TableStructureReadLocks = std::vector<TableStructureReadLockHolder>;
class Context;
@ -22,8 +23,9 @@ class QueryPipeline
public:
QueryPipeline() = default;
/// Each source must have single output port and no inputs. All outputs must have same header.
void init(Processors sources);
/// All pipes must have same header.
void init(Pipes pipes);
void init(Pipe pipe); /// Simple init for single pipe
bool initialized() { return !processors.empty(); }
enum class StreamType
@ -72,7 +74,7 @@ public:
const Block & getHeader() const { return current_header; }
void addTableLock(const TableStructureReadLockPtr & lock) { table_locks.push_back(lock); }
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
/// For compatibility with IBlockInputStream.
void setProgressCallback(const ProgressCallback & callback);

View File

@ -268,6 +268,8 @@ public:
throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
virtual bool supportProcessorsPipeline() const { return false; }
/** Writes the data to a table.
* Receives a description of the query, which can contain information about the data write method.
* Returns an object by which you can write data sequentially.

View File

@ -45,6 +45,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
std::optional<UInt64> totalRows() const override;
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;

View File

@ -97,6 +97,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
std::optional<UInt64> totalRows() const override;
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;