ClickHouse/src/Processors/Pipe.cpp
2021-07-23 17:25:35 +03:00

875 lines
30 KiB
C++

#include <Processors/Pipe.h>
#include <IO/WriteHelpers.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/LimitTransform.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Transforms/ExtremesTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/NullSource.h>
#include <Columns/ColumnConst.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static void checkSource(const IProcessor & source)
{
if (!source.getInputs().empty())
throw Exception("Source for pipe shouldn't have any input, but " + source.getName() + " has " +
toString(source.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
if (source.getOutputs().empty())
throw Exception("Source for pipe should have single output, but it doesn't have any",
ErrorCodes::LOGICAL_ERROR);
if (source.getOutputs().size() > 1)
throw Exception("Source for pipe should have single output, but " + source.getName() + " has " +
toString(source.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
}
static OutputPort * uniteExtremes(const OutputPortRawPtrs & ports, const Block & header, Processors & processors)
{
if (ports.empty())
return nullptr;
if (ports.size() == 1)
return ports.front();
/// Here we calculate extremes for extremes in case we unite several pipelines.
/// Example: select number from numbers(2) union all select number from numbers(3)
/// ->> Resize -> Extremes --(output port)----> Empty
/// --(extremes port)--> ...
auto resize = std::make_shared<ResizeProcessor>(header, ports.size(), 1);
auto extremes = std::make_shared<ExtremesTransform>(header);
auto sink = std::make_shared<EmptySink>(header);
auto * extremes_port = &extremes->getExtremesPort();
auto in = resize->getInputs().begin();
for (const auto & port : ports)
connect(*port, *(in++));
connect(resize->getOutputs().front(), extremes->getInputPort());
connect(extremes->getOutputPort(), sink->getPort());
processors.emplace_back(std::move(resize));
processors.emplace_back(std::move(extremes));
processors.emplace_back(std::move(sink));
return extremes_port;
}
static OutputPort * uniteTotals(const OutputPortRawPtrs & ports, const Block & header, Processors & processors)
{
if (ports.empty())
return nullptr;
if (ports.size() == 1)
return ports.front();
/// Calculate totals from several streams.
/// Take totals from first sources which has any, skip others.
/// ->> Concat -> Limit
auto concat = std::make_shared<ConcatProcessor>(header, ports.size());
auto limit = std::make_shared<LimitTransform>(header, 1, 0);
auto * totals_port = &limit->getOutputPort();
auto in = concat->getInputs().begin();
for (const auto & port : ports)
connect(*port, *(in++));
connect(concat->getOutputs().front(), limit->getInputPort());
processors.emplace_back(std::move(concat));
processors.emplace_back(std::move(limit));
return totals_port;
}
Pipe::Holder & Pipe::Holder::operator=(Holder && rhs)
{
table_locks.insert(table_locks.end(), rhs.table_locks.begin(), rhs.table_locks.end());
storage_holders.insert(storage_holders.end(), rhs.storage_holders.begin(), rhs.storage_holders.end());
interpreter_context.insert(interpreter_context.end(),
rhs.interpreter_context.begin(), rhs.interpreter_context.end());
for (auto & plan : rhs.query_plans)
query_plans.emplace_back(std::move(plan));
query_id_holder = std::move(rhs.query_id_holder);
return *this;
}
Pipe::Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, OutputPort * extremes)
{
if (!source->getInputs().empty())
throw Exception("Source for pipe shouldn't have any input, but " + source->getName() + " has " +
toString(source->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
if (!output)
throw Exception("Cannot create Pipe from source because specified output port is nullptr",
ErrorCodes::LOGICAL_ERROR);
if (output == totals || output == extremes || (totals && totals == extremes))
throw Exception("Cannot create Pipe from source because some of specified ports are the same",
ErrorCodes::LOGICAL_ERROR);
header = output->getHeader();
/// Check that ports belong to source and all ports from source were specified.
{
auto & outputs = source->getOutputs();
size_t num_specified_ports = 0;
auto check_port_from_source = [&](OutputPort * port, std::string name)
{
if (!port)
return;
assertBlocksHaveEqualStructure(header, port->getHeader(), name);
++num_specified_ports;
auto it = std::find_if(outputs.begin(), outputs.end(), [port](const OutputPort & p) { return &p == port; });
if (it == outputs.end())
throw Exception("Cannot create Pipe because specified " + name + " port does not belong to source",
ErrorCodes::LOGICAL_ERROR);
};
check_port_from_source(output, "output");
check_port_from_source(totals, "totals");
check_port_from_source(extremes, "extremes");
if (num_specified_ports != outputs.size())
throw Exception("Cannot create Pipe from source because it has " + std::to_string(outputs.size()) +
" output ports, but " + std::to_string(num_specified_ports) + " were specified",
ErrorCodes::LOGICAL_ERROR);
}
totals_port = totals;
extremes_port = extremes;
output_ports.push_back(output);
processors.emplace_back(std::move(source));
max_parallel_streams = 1;
}
Pipe::Pipe(ProcessorPtr source)
{
if (auto * source_from_input_stream = typeid_cast<SourceFromInputStream *>(source.get()))
{
/// Special case for SourceFromInputStream. Will remove it later.
totals_port = source_from_input_stream->getTotalsPort();
extremes_port = source_from_input_stream->getExtremesPort();
}
else if (source->getOutputs().size() != 1)
checkSource(*source);
if (collected_processors)
collected_processors->emplace_back(source);
output_ports.push_back(&source->getOutputs().front());
header = output_ports.front()->getHeader();
processors.emplace_back(std::move(source));
max_parallel_streams = 1;
}
Pipe::Pipe(Processors processors_) : processors(std::move(processors_))
{
/// Create hash table with processors.
std::unordered_set<const IProcessor *> set;
for (const auto & processor : processors)
set.emplace(processor.get());
for (auto & processor : processors)
{
for (const auto & port : processor->getInputs())
{
if (!port.isConnected())
throw Exception("Cannot create Pipe because processor " + processor->getName() +
" has not connected input port", ErrorCodes::LOGICAL_ERROR);
const auto * connected_processor = &port.getOutputPort().getProcessor();
if (set.count(connected_processor) == 0)
throw Exception("Cannot create Pipe because processor " + processor->getName() +
" has input port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
}
for (auto & port : processor->getOutputs())
{
if (!port.isConnected())
{
output_ports.push_back(&port);
continue;
}
const auto * connected_processor = &port.getInputPort().getProcessor();
if (set.count(connected_processor) == 0)
throw Exception("Cannot create Pipe because processor " + processor->getName() +
" has output port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
}
}
if (output_ports.empty())
throw Exception("Cannot create Pipe because processors don't have any not-connected output ports",
ErrorCodes::LOGICAL_ERROR);
header = output_ports.front()->getHeader();
for (size_t i = 1; i < output_ports.size(); ++i)
assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipe");
max_parallel_streams = output_ports.size();
if (collected_processors)
for (const auto & processor : processors)
collected_processors->emplace_back(processor);
}
static Pipes removeEmptyPipes(Pipes pipes)
{
Pipes res;
res.reserve(pipes.size());
for (auto & pipe : pipes)
{
if (!pipe.empty())
res.emplace_back(std::move(pipe));
}
return res;
}
/// Calculate common header for pipes.
/// This function is needed only to remove ColumnConst from common header in case if some columns are const, and some not.
/// E.g. if the first header is `x, const y, const z` and the second is `const x, y, const z`, the common header will be `x, y, const z`.
static Block getCommonHeader(const Pipes & pipes)
{
Block res;
for (const auto & pipe : pipes)
{
if (const auto & header = pipe.getHeader())
{
res = header;
break;
}
}
for (const auto & pipe : pipes)
{
const auto & header = pipe.getHeader();
for (size_t i = 0; i < res.columns(); ++i)
{
/// We do not check that headers are compatible here. Will do it later.
if (i >= header.columns())
break;
auto & common = res.getByPosition(i).column;
const auto & cur = header.getByPosition(i).column;
/// Only remove const from common header if it is not const for current pipe.
if (cur && common && !isColumnConst(*cur))
{
if (const auto * column_const = typeid_cast<const ColumnConst *>(common.get()))
common = column_const->getDataColumnPtr();
}
}
}
return res;
}
Pipe Pipe::unitePipes(Pipes pipes)
{
return Pipe::unitePipes(std::move(pipes), nullptr, false);
}
Pipe Pipe::unitePipes(Pipes pipes, Processors * collected_processors, bool allow_empty_header)
{
Pipe res;
for (auto & pipe : pipes)
res.holder = std::move(pipe.holder); /// see move assignment for Pipe::Holder.
pipes = removeEmptyPipes(std::move(pipes));
if (pipes.empty())
return res;
if (pipes.size() == 1)
{
pipes[0].holder = std::move(res.holder);
return std::move(pipes[0]);
}
OutputPortRawPtrs totals;
OutputPortRawPtrs extremes;
res.collected_processors = collected_processors;
res.header = getCommonHeader(pipes);
for (auto & pipe : pipes)
{
if (!allow_empty_header || pipe.header)
assertCompatibleHeader(pipe.header, res.header, "Pipe::unitePipes");
res.processors.insert(res.processors.end(), pipe.processors.begin(), pipe.processors.end());
res.output_ports.insert(res.output_ports.end(), pipe.output_ports.begin(), pipe.output_ports.end());
res.max_parallel_streams += pipe.max_parallel_streams;
if (pipe.totals_port)
totals.emplace_back(pipe.totals_port);
if (pipe.extremes_port)
extremes.emplace_back(pipe.extremes_port);
}
size_t num_processors = res.processors.size();
res.totals_port = uniteTotals(totals, res.header, res.processors);
res.extremes_port = uniteExtremes(extremes, res.header, res.processors);
if (res.collected_processors)
{
for (; num_processors < res.processors.size(); ++num_processors)
res.collected_processors->emplace_back(res.processors[num_processors]);
}
return res;
}
void Pipe::addSource(ProcessorPtr source)
{
checkSource(*source);
const auto & source_header = source->getOutputs().front().getHeader();
if (output_ports.empty())
header = source_header;
else
assertBlocksHaveEqualStructure(header, source_header, "Pipes");
if (collected_processors)
collected_processors->emplace_back(source);
output_ports.push_back(&source->getOutputs().front());
processors.emplace_back(std::move(source));
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
void Pipe::addTotalsSource(ProcessorPtr source)
{
if (output_ports.empty())
throw Exception("Cannot add totals source to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
if (totals_port)
throw Exception("Totals source was already added to Pipe.", ErrorCodes::LOGICAL_ERROR);
checkSource(*source);
const auto & source_header = output_ports.front()->getHeader();
assertBlocksHaveEqualStructure(header, source_header, "Pipes");
if (collected_processors)
collected_processors->emplace_back(source);
totals_port = &source->getOutputs().front();
processors.emplace_back(std::move(source));
}
void Pipe::addExtremesSource(ProcessorPtr source)
{
if (output_ports.empty())
throw Exception("Cannot add extremes source to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
if (extremes_port)
throw Exception("Extremes source was already added to Pipe.", ErrorCodes::LOGICAL_ERROR);
checkSource(*source);
const auto & source_header = output_ports.front()->getHeader();
assertBlocksHaveEqualStructure(header, source_header, "Pipes");
if (collected_processors)
collected_processors->emplace_back(source);
extremes_port = &source->getOutputs().front();
processors.emplace_back(std::move(source));
}
static void dropPort(OutputPort *& port, Processors & processors, Processors * collected_processors)
{
if (port == nullptr)
return;
auto null_sink = std::make_shared<NullSink>(port->getHeader());
connect(*port, null_sink->getPort());
if (collected_processors)
collected_processors->emplace_back(null_sink);
processors.emplace_back(std::move(null_sink));
port = nullptr;
}
void Pipe::dropTotals()
{
dropPort(totals_port, processors, collected_processors);
}
void Pipe::dropExtremes()
{
dropPort(extremes_port, processors, collected_processors);
}
void Pipe::addTransform(ProcessorPtr transform)
{
addTransform(std::move(transform), static_cast<OutputPort *>(nullptr), static_cast<OutputPort *>(nullptr));
}
void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes)
{
if (output_ports.empty())
throw Exception("Cannot add transform to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
auto & inputs = transform->getInputs();
if (inputs.size() != output_ports.size())
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"Processor has " + std::to_string(inputs.size()) + " input ports, "
"but " + std::to_string(output_ports.size()) + " expected", ErrorCodes::LOGICAL_ERROR);
if (totals && totals_port)
throw Exception("Cannot add transform with totals to Pipe because it already has totals.",
ErrorCodes::LOGICAL_ERROR);
if (extremes && extremes_port)
throw Exception("Cannot add transform with extremes to Pipe because it already has extremes.",
ErrorCodes::LOGICAL_ERROR);
if (totals)
totals_port = totals;
if (extremes)
extremes_port = extremes;
size_t next_output = 0;
for (auto & input : inputs)
{
connect(*output_ports[next_output], input);
++next_output;
}
auto & outputs = transform->getOutputs();
output_ports.clear();
output_ports.reserve(outputs.size());
bool found_totals = false;
bool found_extremes = false;
for (auto & output : outputs)
{
if (&output == totals)
found_totals = true;
else if (&output == extremes)
found_extremes = true;
else
output_ports.emplace_back(&output);
}
if (totals && !found_totals)
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"specified totals port does not belong to it", ErrorCodes::LOGICAL_ERROR);
if (extremes && !found_extremes)
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"specified extremes port does not belong to it", ErrorCodes::LOGICAL_ERROR);
if (output_ports.empty())
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because it has no outputs",
ErrorCodes::LOGICAL_ERROR);
header = output_ports.front()->getHeader();
for (size_t i = 1; i < output_ports.size(); ++i)
assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipes");
// Temporarily skip this check. TotaslHavingTransform may return finalized totals but not finalized data.
// if (totals_port)
// assertBlocksHaveEqualStructure(header, totals_port->getHeader(), "Pipes");
if (extremes_port)
assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes");
if (collected_processors)
collected_processors->emplace_back(transform);
processors.emplace_back(std::move(transform));
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes)
{
if (output_ports.empty())
throw Exception("Cannot add transform to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
auto & inputs = transform->getInputs();
size_t expected_inputs = output_ports.size() + (totals ? 1 : 0) + (extremes ? 1 : 0);
if (inputs.size() != expected_inputs)
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"Processor has " + std::to_string(inputs.size()) + " input ports, "
"but " + std::to_string(expected_inputs) + " expected", ErrorCodes::LOGICAL_ERROR);
if (totals && !totals_port)
throw Exception("Cannot add transform consuming totals to Pipe because Pipe does not have totals.",
ErrorCodes::LOGICAL_ERROR);
if (extremes && !extremes_port)
throw Exception("Cannot add transform consuming extremes to Pipe because it already has extremes.",
ErrorCodes::LOGICAL_ERROR);
if (totals)
{
connect(*totals_port, *totals);
totals_port = nullptr;
}
if (extremes)
{
connect(*extremes_port, *extremes);
extremes_port = nullptr;
}
bool found_totals = false;
bool found_extremes = false;
size_t next_output = 0;
for (auto & input : inputs)
{
if (&input == totals)
found_totals = true;
else if (&input == extremes)
found_extremes = true;
else
{
connect(*output_ports[next_output], input);
++next_output;
}
}
if (totals && !found_totals)
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"specified totals port does not belong to it", ErrorCodes::LOGICAL_ERROR);
if (extremes && !found_extremes)
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"specified extremes port does not belong to it", ErrorCodes::LOGICAL_ERROR);
auto & outputs = transform->getOutputs();
if (outputs.empty())
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because it has no outputs",
ErrorCodes::LOGICAL_ERROR);
output_ports.clear();
output_ports.reserve(outputs.size());
for (auto & output : outputs)
output_ports.emplace_back(&output);
header = output_ports.front()->getHeader();
for (size_t i = 1; i < output_ports.size(); ++i)
assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipes");
if (totals_port)
assertBlocksHaveEqualStructure(header, totals_port->getHeader(), "Pipes");
if (extremes_port)
assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes");
if (collected_processors)
collected_processors->emplace_back(transform);
processors.emplace_back(std::move(transform));
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
{
if (output_ports.empty())
throw Exception("Cannot add simple transform to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
Block new_header;
auto add_transform = [&](OutputPort *& port, StreamType stream_type)
{
if (!port)
return;
auto transform = getter(port->getHeader(), stream_type);
if (transform)
{
if (transform->getInputs().size() != 1)
throw Exception("Processor for query pipeline transform should have single input, "
"but " + transform->getName() + " has " +
toString(transform->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
if (transform->getOutputs().size() != 1)
throw Exception("Processor for query pipeline transform should have single output, "
"but " + transform->getName() + " has " +
toString(transform->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
}
const auto & out_header = transform ? transform->getOutputs().front().getHeader()
: port->getHeader();
if (new_header)
assertBlocksHaveEqualStructure(new_header, out_header, "QueryPipeline");
else
new_header = out_header;
if (transform)
{
connect(*port, transform->getInputs().front());
port = &transform->getOutputs().front();
if (collected_processors)
collected_processors->emplace_back(transform);
processors.emplace_back(std::move(transform));
}
};
for (auto & port : output_ports)
add_transform(port, StreamType::Main);
add_transform(totals_port, StreamType::Totals);
add_transform(extremes_port, StreamType::Extremes);
header = std::move(new_header);
}
void Pipe::addSimpleTransform(const ProcessorGetter & getter)
{
addSimpleTransform([&](const Block & stream_header, StreamType) { return getter(stream_header); });
}
void Pipe::resize(size_t num_streams, bool force, bool strict)
{
if (output_ports.empty())
throw Exception("Cannot resize an empty Pipe.", ErrorCodes::LOGICAL_ERROR);
if (!force && num_streams == numOutputPorts())
return;
ProcessorPtr resize;
if (strict)
resize = std::make_shared<StrictResizeProcessor>(getHeader(), numOutputPorts(), num_streams);
else
resize = std::make_shared<ResizeProcessor>(getHeader(), numOutputPorts(), num_streams);
addTransform(std::move(resize));
}
void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
{
if (output_ports.empty())
throw Exception("Cannot set sink to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
auto add_transform = [&](OutputPort *& stream, Pipe::StreamType stream_type)
{
if (!stream)
return;
auto transform = getter(stream->getHeader(), stream_type);
if (transform)
{
if (transform->getInputs().size() != 1)
throw Exception("Sink for query pipeline transform should have single input, "
"but " + transform->getName() + " has " +
toString(transform->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
if (!transform->getOutputs().empty())
throw Exception("Sink for query pipeline transform should have no outputs, "
"but " + transform->getName() + " has " +
toString(transform->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
}
if (!transform)
transform = std::make_shared<NullSink>(stream->getHeader());
connect(*stream, transform->getInputs().front());
processors.emplace_back(std::move(transform));
};
for (auto & port : output_ports)
add_transform(port, StreamType::Main);
add_transform(totals_port, StreamType::Totals);
add_transform(extremes_port, StreamType::Extremes);
output_ports.clear();
header.clear();
}
void Pipe::setOutputFormat(ProcessorPtr output)
{
if (output_ports.empty())
throw Exception("Cannot set output format to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
if (output_ports.size() != 1)
throw Exception("Cannot set output format to Pipe because single output port is expected, "
"but it has " + std::to_string(output_ports.size()) + " ports", ErrorCodes::LOGICAL_ERROR);
auto * format = dynamic_cast<IOutputFormat * >(output.get());
if (!format)
throw Exception("IOutputFormat processor expected for QueryPipeline::setOutputFormat.",
ErrorCodes::LOGICAL_ERROR);
auto & main = format->getPort(IOutputFormat::PortKind::Main);
auto & totals = format->getPort(IOutputFormat::PortKind::Totals);
auto & extremes = format->getPort(IOutputFormat::PortKind::Extremes);
if (!totals_port)
addTotalsSource(std::make_shared<NullSource>(totals.getHeader()));
if (!extremes_port)
addExtremesSource(std::make_shared<NullSource>(extremes.getHeader()));
if (collected_processors)
collected_processors->emplace_back(output);
processors.emplace_back(std::move(output));
connect(*output_ports.front(), main);
connect(*totals_port, totals);
connect(*extremes_port, extremes);
output_ports.clear();
header.clear();
}
void Pipe::transform(const Transformer & transformer)
{
if (output_ports.empty())
throw Exception("Cannot transform empty Pipe.", ErrorCodes::LOGICAL_ERROR);
auto new_processors = transformer(output_ports);
/// Create hash table with new processors.
std::unordered_set<const IProcessor *> set;
for (const auto & processor : new_processors)
set.emplace(processor.get());
for (const auto & port : output_ports)
{
if (!port->isConnected())
throw Exception("Transformation of Pipe is not valid because output port (" +
port->getHeader().dumpStructure() + ") is not connected", ErrorCodes::LOGICAL_ERROR);
set.emplace(&port->getProcessor());
}
output_ports.clear();
for (const auto & processor : new_processors)
{
for (const auto & port : processor->getInputs())
{
if (!port.isConnected())
throw Exception("Transformation of Pipe is not valid because processor " + processor->getName() +
" has not connected input port", ErrorCodes::LOGICAL_ERROR);
const auto * connected_processor = &port.getOutputPort().getProcessor();
if (set.count(connected_processor) == 0)
throw Exception("Transformation of Pipe is not valid because processor " + processor->getName() +
" has input port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
}
for (auto & port : processor->getOutputs())
{
if (!port.isConnected())
{
output_ports.push_back(&port);
continue;
}
const auto * connected_processor = &port.getInputPort().getProcessor();
if (set.count(connected_processor) == 0)
throw Exception("Transformation of Pipe is not valid because processor " + processor->getName() +
" has output port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
}
}
if (output_ports.empty())
throw Exception("Transformation of Pipe is not valid because processors don't have any "
"not-connected output ports", ErrorCodes::LOGICAL_ERROR);
header = output_ports.front()->getHeader();
for (size_t i = 1; i < output_ports.size(); ++i)
assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipe");
if (totals_port)
assertBlocksHaveEqualStructure(header, totals_port->getHeader(), "Pipes");
if (extremes_port)
assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes");
if (collected_processors)
{
for (const auto & processor : new_processors)
collected_processors->emplace_back(processor);
}
processors.insert(processors.end(), new_processors.begin(), new_processors.end());
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
void Pipe::setLimits(const StreamLocalLimits & limits)
{
for (auto & processor : processors)
{
if (auto * source_with_progress = dynamic_cast<ISourceWithProgress *>(processor.get()))
source_with_progress->setLimits(limits);
}
}
void Pipe::setLeafLimits(const SizeLimits & leaf_limits)
{
for (auto & processor : processors)
{
if (auto * source_with_progress = dynamic_cast<ISourceWithProgress *>(processor.get()))
source_with_progress->setLeafLimits(leaf_limits);
}
}
void Pipe::setQuota(const std::shared_ptr<const EnabledQuota> & quota)
{
for (auto & processor : processors)
{
if (auto * source_with_progress = dynamic_cast<ISourceWithProgress *>(processor.get()))
source_with_progress->setQuota(quota);
}
}
}