Update TreeExecutor.

This commit is contained in:
Nikolai Kochetov 2020-02-03 18:35:35 +03:00
parent 30586111a6
commit a832a630d8
2 changed files with 35 additions and 10 deletions

View File

@ -17,7 +17,7 @@ static void checkProcessorHasSingleOutput(IProcessor * processor)
/// Check tree invariants (described in TreeExecutor.h). /// Check tree invariants (described in TreeExecutor.h).
/// Collect sources with progress. /// Collect sources with progress.
static void validateTree(const Processors & processors, IProcessor * root, std::vector<ISourceWithProgress *> & sources) static void validateTree(const Processors & processors, IProcessor * root, IProcessor * totals_root, std::vector<ISourceWithProgress *> & sources)
{ {
std::unordered_map<IProcessor *, size_t> index; std::unordered_map<IProcessor *, size_t> index;
@ -34,6 +34,8 @@ static void validateTree(const Processors & processors, IProcessor * root, std::
std::stack<IProcessor *> stack; std::stack<IProcessor *> stack;
stack.push(root); stack.push(root);
if (totals_root)
stack.push(totals_root);
while (!stack.empty()) while (!stack.empty())
{ {
@ -81,18 +83,33 @@ void TreeExecutorBlockInputStream::init()
throw Exception("No processors were passed to TreeExecutorBlockInputStream.", ErrorCodes::LOGICAL_ERROR); throw Exception("No processors were passed to TreeExecutorBlockInputStream.", ErrorCodes::LOGICAL_ERROR);
root = &output_port.getProcessor(); root = &output_port.getProcessor();
IProcessor * totals_root = nullptr;
validateTree(processors, root, sources_with_progress); if (totals_port)
totals_root = &totals_port->getProcessor();
validateTree(processors, root, totals_root, sources_with_progress);
input_port = std::make_unique<InputPort>(getHeader(), root); input_port = std::make_unique<InputPort>(getHeader(), root);
connect(output_port, *input_port); connect(output_port, *input_port);
input_port->setNeeded(); input_port->setNeeded();
if (totals_port)
{
input_totals_port = std::make_unique<InputPort>(totals_port->getHeader(), root);
connect(*totals_port, *input_totals_port);
input_totals_port->setNeeded();
}
} }
void TreeExecutorBlockInputStream::execute() void TreeExecutorBlockInputStream::execute(bool on_totals)
{ {
std::stack<IProcessor *> stack; std::stack<IProcessor *> stack;
stack.push(root);
if (on_totals)
stack.push(&totals_port->getProcessor());
else
stack.push(root);
auto prepare_processor = [](IProcessor * processor) auto prepare_processor = [](IProcessor * processor)
{ {
@ -141,10 +158,6 @@ void TreeExecutorBlockInputStream::execute()
break; break;
} }
case IProcessor::Status::PortFull: case IProcessor::Status::PortFull:
{
stack.pop();
break;
}
case IProcessor::Status::Finished: case IProcessor::Status::Finished:
{ {
stack.pop(); stack.pop();
@ -178,12 +191,21 @@ Block TreeExecutorBlockInputStream::readImpl()
while (true) while (true)
{ {
if (input_port->isFinished()) if (input_port->isFinished())
{
if (totals_port && !input_totals_port->isFinished())
{
execute(true);
if (input_totals_port->hasData())
totals = getHeader().cloneWithColumns(input_totals_port->pull().detachColumns());
}
return {}; return {};
}
if (input_port->hasData()) if (input_port->hasData())
return getHeader().cloneWithColumns(input_port->pull().detachColumns()); return getHeader().cloneWithColumns(input_port->pull().detachColumns());
execute(); execute(false);
} }
} }

View File

@ -29,6 +29,7 @@ public:
for (auto & context : pipe.getContexts()) for (auto & context : pipe.getContexts())
interpreter_context.emplace_back(context); interpreter_context.emplace_back(context);
totals_port = pipe.getTotalsPort();
processors = std::move(pipe).detachProcessors(); processors = std::move(pipe).detachProcessors();
init(); init();
} }
@ -49,9 +50,11 @@ protected:
private: private:
OutputPort & output_port; OutputPort & output_port;
OutputPort * totals_port = nullptr;
Processors processors; Processors processors;
IProcessor * root = nullptr; IProcessor * root = nullptr;
std::unique_ptr<InputPort> input_port; std::unique_ptr<InputPort> input_port;
std::unique_ptr<InputPort> input_totals_port;
/// Remember sources that support progress. /// Remember sources that support progress.
std::vector<ISourceWithProgress *> sources_with_progress; std::vector<ISourceWithProgress *> sources_with_progress;
@ -60,7 +63,7 @@ private:
void init(); void init();
/// Execute tree step-by-step until root returns next chunk or execution is finished. /// Execute tree step-by-step until root returns next chunk or execution is finished.
void execute(); void execute(bool on_totals);
/// Moved from pipe. /// Moved from pipe.
std::vector<std::shared_ptr<Context>> interpreter_context; std::vector<std::shared_ptr<Context>> interpreter_context;