ClickHouse/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.cpp

215 lines
6.6 KiB
C++
Raw Normal View History

2019-10-21 16:26:29 +00:00
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
2019-10-04 15:40:05 +00:00
#include <Processors/Sources/SourceWithProgress.h>
2019-09-13 12:34:05 +00:00
#include <stack>
namespace DB
{
static void checkProcessorHasSingleOutput(IProcessor * processor)
{
size_t num_outputs = processor->getOutputs().size();
if (num_outputs != 1)
2019-10-21 16:26:29 +00:00
throw Exception("All processors in TreeExecutorBlockInputStream must have single output, "
2019-09-13 12:34:05 +00:00
"but processor with name " + processor->getName() + " has " + std::to_string(num_outputs),
ErrorCodes::LOGICAL_ERROR);
}
2019-10-10 14:16:15 +00:00
/// Check tree invariants (described in TreeExecutor.h).
/// Collect sources with progress.
2019-10-04 15:40:05 +00:00
static void validateTree(const Processors & processors, IProcessor * root, std::vector<ISourceWithProgress *> & sources)
2019-09-13 12:34:05 +00:00
{
std::unordered_map<IProcessor *, size_t> index;
for (auto & processor : processors)
{
bool is_inserted = index.try_emplace(processor.get(), index.size()).second;
if (!is_inserted)
2019-10-21 16:26:29 +00:00
throw Exception("Duplicate processor in TreeExecutorBlockInputStream with name " + processor->getName(),
2019-09-13 12:34:05 +00:00
ErrorCodes::LOGICAL_ERROR);
}
std::vector<bool> is_visited(processors.size(), false);
std::stack<IProcessor *> stack;
stack.push(root);
while (!stack.empty())
{
IProcessor * node = stack.top();
stack.pop();
auto it = index.find(node);
if (it == index.end())
throw Exception("Processor with name " + node->getName() + " "
2019-10-21 16:26:29 +00:00
"was not mentioned in list passed to TreeExecutorBlockInputStream, "
2019-09-13 12:34:05 +00:00
"but was traversed to from other processors.", ErrorCodes::LOGICAL_ERROR);
size_t position = it->second;
if (is_visited[position])
2019-10-21 16:26:29 +00:00
throw Exception("Processor with name " + node->getName() + " was visited twice while traverse in TreeExecutorBlockInputStream. "
2019-09-13 12:34:05 +00:00
"Passed processors are not tree.", ErrorCodes::LOGICAL_ERROR);
2019-10-01 18:30:23 +00:00
is_visited[position] = true;
2019-09-13 12:34:05 +00:00
checkProcessorHasSingleOutput(node);
auto & children = node->getInputs();
for (auto & child : children)
stack.push(&child.getOutputPort().getProcessor());
2019-10-04 15:40:05 +00:00
/// Fill sources array.
if (children.empty())
{
if (auto * source = dynamic_cast<ISourceWithProgress *>(node))
sources.push_back(source);
}
2019-09-13 12:34:05 +00:00
}
for (size_t i = 0; i < is_visited.size(); ++i)
if (!is_visited[i])
throw Exception("Processor with name " + processors[i]->getName() +
2019-10-21 16:26:29 +00:00
" was not visited by traverse in TreeExecutorBlockInputStream.", ErrorCodes::LOGICAL_ERROR);
2019-09-13 12:34:05 +00:00
}
2019-10-21 16:26:29 +00:00
void TreeExecutorBlockInputStream::init()
2019-09-13 12:34:05 +00:00
{
if (processors.empty())
2019-10-21 16:26:29 +00:00
throw Exception("No processors were passed to TreeExecutorBlockInputStream.", ErrorCodes::LOGICAL_ERROR);
2019-09-13 12:34:05 +00:00
root = &output_port.getProcessor();
2019-09-13 12:34:05 +00:00
2019-10-04 15:40:05 +00:00
validateTree(processors, root, sources_with_progress);
2019-09-13 12:34:05 +00:00
input_port = std::make_unique<InputPort>(getHeader(), root);
connect(output_port, *input_port);
input_port->setNeeded();
2019-09-13 12:34:05 +00:00
}
2019-10-21 16:26:29 +00:00
void TreeExecutorBlockInputStream::execute()
2019-09-13 12:34:05 +00:00
{
std::stack<IProcessor *> stack;
stack.push(root);
auto prepare_processor = [](IProcessor * processor)
{
try
{
return processor->prepare();
}
catch (Exception & exception)
{
exception.addMessage(" While executing processor " + processor->getName());
throw;
}
};
2019-09-13 12:34:05 +00:00
while (!stack.empty())
{
IProcessor * node = stack.top();
auto status = prepare_processor(node);
2019-09-13 12:34:05 +00:00
switch (status)
{
case IProcessor::Status::NeedData:
{
auto & inputs = node->getInputs();
if (inputs.empty())
throw Exception("Processors " + node->getName() + " with empty input "
2019-10-21 16:26:29 +00:00
"has returned NeedData in TreeExecutorBlockInputStream", ErrorCodes::LOGICAL_ERROR);
2019-09-13 12:34:05 +00:00
bool all_finished = true;
for (auto & input : inputs)
{
if (input.isFinished())
continue;
all_finished = false;
stack.push(&input.getOutputPort().getProcessor());
}
if (all_finished)
2019-10-21 16:26:29 +00:00
throw Exception("Processors " + node->getName() + " has returned NeedData in TreeExecutorBlockInputStream, "
2019-09-13 12:34:05 +00:00
"but all it's inputs are finished.", ErrorCodes::LOGICAL_ERROR);
break;
}
case IProcessor::Status::PortFull:
{
stack.pop();
break;
}
case IProcessor::Status::Finished:
{
stack.pop();
break;
}
case IProcessor::Status::Ready:
{
node->work();
break;
}
case IProcessor::Status::Async:
case IProcessor::Status::Wait:
case IProcessor::Status::ExpandPipeline:
{
throw Exception("Processor with name " + node->getName() + " "
"returned status " + IProcessor::statusToName(status) + " "
2019-10-21 16:26:29 +00:00
"which is not supported in TreeExecutorBlockInputStream.", ErrorCodes::LOGICAL_ERROR);
2019-09-13 12:34:05 +00:00
}
}
}
}
2019-10-21 16:26:29 +00:00
Block TreeExecutorBlockInputStream::readImpl()
2019-09-13 12:34:05 +00:00
{
while (true)
{
if (input_port->isFinished())
2019-09-13 12:34:05 +00:00
return {};
if (input_port->hasData())
return getHeader().cloneWithColumns(input_port->pull().detachColumns());
2019-09-13 12:34:05 +00:00
execute();
}
}
2019-10-21 16:26:29 +00:00
void TreeExecutorBlockInputStream::setProgressCallback(const ProgressCallback & callback)
2019-10-04 15:40:05 +00:00
{
for (auto & source : sources_with_progress)
source->setProgressCallback(callback);
}
2019-10-21 16:26:29 +00:00
void TreeExecutorBlockInputStream::setProcessListElement(QueryStatus * elem)
2019-10-04 15:40:05 +00:00
{
for (auto & source : sources_with_progress)
source->setProcessListElement(elem);
}
2019-10-21 16:26:29 +00:00
void TreeExecutorBlockInputStream::setLimits(const IBlockInputStream::LocalLimits & limits_)
2019-10-04 15:40:05 +00:00
{
for (auto & source : sources_with_progress)
source->setLimits(limits_);
}
2019-10-21 16:26:29 +00:00
void TreeExecutorBlockInputStream::setQuota(QuotaForIntervals & quota_)
2019-10-04 15:40:05 +00:00
{
for (auto & source : sources_with_progress)
source->setQuota(quota_);
}
2019-10-21 16:26:29 +00:00
void TreeExecutorBlockInputStream::addTotalRowsApprox(size_t value)
2019-10-04 15:40:05 +00:00
{
/// Add only for one source.
if (!sources_with_progress.empty())
sources_with_progress.front()->addTotalRowsApprox(value);
}
2019-09-13 12:34:05 +00:00
}