Fix build.

This commit is contained in:
Nikolai Kochetov 2020-03-19 16:45:19 +03:00
parent f7c17435b8
commit 100fe4c93c
3 changed files with 43 additions and 29 deletions

View File

@ -488,8 +488,6 @@ void PipelineExecutor::execute(size_t num_threads)
if (!all_processors_finished) if (!all_processors_finished)
throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR); throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR);
std::cerr << dumpPipeline() << std::endl;
} }
void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads) void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads)

View File

@ -120,6 +120,8 @@ void TreeExecutorBlockInputStream::init()
connect(*totals_port, *input_totals_port); connect(*totals_port, *input_totals_port);
input_totals_port->setNeeded(); input_totals_port->setNeeded();
} }
initRowsBeforeLimit();
} }
void TreeExecutorBlockInputStream::execute(bool on_totals) void TreeExecutorBlockInputStream::execute(bool on_totals)
@ -206,42 +208,45 @@ void TreeExecutorBlockInputStream::execute(bool on_totals)
} }
} }
void TreeExecutorBlockInputStream::calcRowsBeforeLimit() void TreeExecutorBlockInputStream::initRowsBeforeLimit()
{ {
std::stack<IProcessor *> stack; std::vector<LimitTransform *> limits;
stack.push(root); std::vector<SourceFromInputStream *> sources;
size_t rows_before_limit = 0; struct StackEntry
bool has_limit = false; {
IProcessor * processor;
bool visited_limit;
};
std::stack<StackEntry> stack;
stack.push({root, false});
while (!stack.empty()) while (!stack.empty())
{ {
auto processor = stack.top(); auto processor = stack.top().processor;
bool visited_limit = stack.top().visited_limit;
stack.pop(); stack.pop();
if (auto * limit = typeid_cast<const LimitTransform *>(processor)) if (!visited_limit)
{ {
has_limit = true;
rows_before_limit += limit->getRowsBeforeLimitAtLeast();
}
if (auto * source = typeid_cast<SourceFromInputStream *>(processor)) if (auto * limit = typeid_cast<LimitTransform *>(processor))
{
if (auto & stream = source->getStream())
{ {
auto & profile_info = stream->getProfileInfo(); visited_limit = true;
if (profile_info.hasAppliedLimit()) limits.emplace_back(limit);
{
has_limit = true;
rows_before_limit += profile_info.getRowsBeforeLimit();
}
} }
if (auto * source = typeid_cast<SourceFromInputStream *>(processor))
sources.emplace_back(source);
} }
if (auto * sorting = typeid_cast<const PartialSortingTransform *>(processor)) if (auto * sorting = typeid_cast<PartialSortingTransform *>(processor))
{ {
rows_before_limit += sorting->getNumReadRows(); if (!rows_before_limit_at_least)
has_limit = true; rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
sorting->setRowsBeforeLimitCounter(rows_before_limit_at_least);
/// Don't go to children. Take rows_before_limit from last PartialSortingTransform. /// Don't go to children. Take rows_before_limit from last PartialSortingTransform.
continue; continue;
@ -250,12 +255,20 @@ void TreeExecutorBlockInputStream::calcRowsBeforeLimit()
for (auto & child_port : processor->getInputs()) for (auto & child_port : processor->getInputs())
{ {
auto * child_processor = &child_port.getOutputPort().getProcessor(); auto * child_processor = &child_port.getOutputPort().getProcessor();
stack.push(child_processor); stack.push({child_processor, visited_limit});
} }
} }
if (has_limit) if (!rows_before_limit_at_least && (!limits.empty() && !sources.empty()))
info.setRowsBeforeLimit(rows_before_limit); {
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
for (auto & limit : limits)
limit->setRowsBeforeLimitCounter(rows_before_limit_at_least);
for (auto & source : sources)
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
}
} }
Block TreeExecutorBlockInputStream::readImpl() Block TreeExecutorBlockInputStream::readImpl()
@ -271,7 +284,8 @@ Block TreeExecutorBlockInputStream::readImpl()
totals = getHeader().cloneWithColumns(input_totals_port->pull().detachColumns()); totals = getHeader().cloneWithColumns(input_totals_port->pull().detachColumns());
} }
calcRowsBeforeLimit(); if (rows_before_limit_at_least && rows_before_limit_at_least->hasAppliedLimit())
info.setRowsBeforeLimit(rows_before_limit_at_least->get());
return {}; return {};
} }

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/RowsBeforeLimitCounter.h>
namespace DB namespace DB
{ {
@ -55,6 +56,7 @@ private:
IProcessor * root = nullptr; IProcessor * root = nullptr;
std::unique_ptr<InputPort> input_port; std::unique_ptr<InputPort> input_port;
std::unique_ptr<InputPort> input_totals_port; std::unique_ptr<InputPort> input_totals_port;
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
/// Remember sources that support progress. /// Remember sources that support progress.
std::vector<ISourceWithProgress *> sources_with_progress; std::vector<ISourceWithProgress *> sources_with_progress;
@ -65,7 +67,7 @@ private:
/// Execute tree step-by-step until root returns next chunk or execution is finished. /// Execute tree step-by-step until root returns next chunk or execution is finished.
void execute(bool on_totals); void execute(bool on_totals);
void calcRowsBeforeLimit(); void initRowsBeforeLimit();
/// Moved from pipe. /// Moved from pipe.
std::vector<std::shared_ptr<Context>> interpreter_context; std::vector<std::shared_ptr<Context>> interpreter_context;