mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 09:02:00 +00:00
Merge pull request #13189 from ClickHouse/refactor-pipes-2
Remove TreeExecutorBIS
This commit is contained in:
commit
c786f05388
@ -7,7 +7,8 @@
|
|||||||
#include <Processors/Pipe.h>
|
#include <Processors/Pipe.h>
|
||||||
#include <Processors/Sources/SourceFromInputStream.h>
|
#include <Processors/Sources/SourceFromInputStream.h>
|
||||||
#include <Processors/Merges/MergingSortedTransform.h>
|
#include <Processors/Merges/MergingSortedTransform.h>
|
||||||
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
|
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
|
||||||
|
#include <Processors/QueryPipeline.h>
|
||||||
|
|
||||||
using namespace DB;
|
using namespace DB;
|
||||||
|
|
||||||
@ -85,7 +86,10 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
|
|||||||
auto transform = std::make_shared<MergingSortedTransform>(pipes.front().getHeader(), pipes.size(), sort_description,
|
auto transform = std::make_shared<MergingSortedTransform>(pipes.front().getHeader(), pipes.size(), sort_description,
|
||||||
DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true);
|
DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true);
|
||||||
|
|
||||||
auto stream = std::make_shared<TreeExecutorBlockInputStream>(Pipe(std::move(pipes), std::move(transform)));
|
QueryPipeline pipeline;
|
||||||
|
pipeline.init(Pipe(std::move(pipes), std::move(transform)));
|
||||||
|
pipeline.setMaxThreads(1);
|
||||||
|
auto stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
|
||||||
|
|
||||||
size_t total_rows = 0;
|
size_t total_rows = 0;
|
||||||
auto block1 = stream->read();
|
auto block1 = stream->read();
|
||||||
@ -125,7 +129,10 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes)
|
|||||||
auto transform = std::make_shared<MergingSortedTransform>(pipes.front().getHeader(), pipes.size(), sort_description,
|
auto transform = std::make_shared<MergingSortedTransform>(pipes.front().getHeader(), pipes.size(), sort_description,
|
||||||
DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true);
|
DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true);
|
||||||
|
|
||||||
auto stream = std::make_shared<TreeExecutorBlockInputStream>(Pipe(std::move(pipes), std::move(transform)));
|
QueryPipeline pipeline;
|
||||||
|
pipeline.init(Pipe(std::move(pipes), std::move(transform)));
|
||||||
|
pipeline.setMaxThreads(1);
|
||||||
|
auto stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
|
||||||
|
|
||||||
auto block1 = stream->read();
|
auto block1 = stream->read();
|
||||||
auto block2 = stream->read();
|
auto block2 = stream->read();
|
||||||
|
@ -1,386 +0,0 @@
|
|||||||
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
|
|
||||||
#include <Processors/Sources/SourceWithProgress.h>
|
|
||||||
#include <Interpreters/ProcessList.h>
|
|
||||||
#include <stack>
|
|
||||||
#include <Processors/Sources/SourceFromInputStream.h>
|
|
||||||
#include <Processors/Transforms/AggregatingTransform.h>
|
|
||||||
#include <Processors/LimitTransform.h>
|
|
||||||
#include <Processors/Transforms/PartialSortingTransform.h>
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
namespace ErrorCodes
|
|
||||||
{
|
|
||||||
extern const int QUERY_WAS_CANCELLED;
|
|
||||||
extern const int LOGICAL_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void checkProcessorHasSingleOutput(IProcessor * processor)
|
|
||||||
{
|
|
||||||
/// SourceFromInputStream may have totals port. Skip this check.
|
|
||||||
if (typeid_cast<const SourceFromInputStream *>(processor))
|
|
||||||
return;
|
|
||||||
|
|
||||||
size_t num_outputs = processor->getOutputs().size();
|
|
||||||
if (num_outputs != 1)
|
|
||||||
throw Exception("All processors in TreeExecutorBlockInputStream must have single output, "
|
|
||||||
"but processor with name " + processor->getName() + " has " + std::to_string(num_outputs),
|
|
||||||
ErrorCodes::LOGICAL_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check tree invariants (described in TreeExecutor.h).
|
|
||||||
/// Collect sources with progress.
|
|
||||||
static void validateTree(
|
|
||||||
const Processors & processors,
|
|
||||||
IProcessor * root, IProcessor * totals_root, IProcessor * extremes_root,
|
|
||||||
std::vector<ISourceWithProgress *> & sources)
|
|
||||||
{
|
|
||||||
std::unordered_map<IProcessor *, size_t> index;
|
|
||||||
|
|
||||||
for (const auto & processor : processors)
|
|
||||||
{
|
|
||||||
bool is_inserted = index.try_emplace(processor.get(), index.size()).second;
|
|
||||||
|
|
||||||
if (!is_inserted)
|
|
||||||
throw Exception("Duplicate processor in TreeExecutorBlockInputStream with name " + processor->getName(),
|
|
||||||
ErrorCodes::LOGICAL_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<bool> is_visited(processors.size(), false);
|
|
||||||
std::stack<IProcessor *> stack;
|
|
||||||
|
|
||||||
stack.push(root);
|
|
||||||
if (totals_root)
|
|
||||||
stack.push(totals_root);
|
|
||||||
if (extremes_root)
|
|
||||||
stack.push(extremes_root);
|
|
||||||
|
|
||||||
while (!stack.empty())
|
|
||||||
{
|
|
||||||
IProcessor * node = stack.top();
|
|
||||||
stack.pop();
|
|
||||||
|
|
||||||
auto it = index.find(node);
|
|
||||||
|
|
||||||
if (it == index.end())
|
|
||||||
throw Exception("Processor with name " + node->getName() + " "
|
|
||||||
"was not mentioned in list passed to TreeExecutorBlockInputStream, "
|
|
||||||
"but was traversed to from other processors.", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
size_t position = it->second;
|
|
||||||
|
|
||||||
if (is_visited[position])
|
|
||||||
{
|
|
||||||
/// SourceFromInputStream may have totals port. Skip this check.
|
|
||||||
if (typeid_cast<const SourceFromInputStream *>(node))
|
|
||||||
continue;
|
|
||||||
|
|
||||||
throw Exception("Processor with name " + node->getName() +
|
|
||||||
" was visited twice while traverse in TreeExecutorBlockInputStream. "
|
|
||||||
"Passed processors are not tree.", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
is_visited[position] = true;
|
|
||||||
|
|
||||||
checkProcessorHasSingleOutput(node);
|
|
||||||
|
|
||||||
auto & children = node->getInputs();
|
|
||||||
for (auto & child : children)
|
|
||||||
stack.push(&child.getOutputPort().getProcessor());
|
|
||||||
|
|
||||||
/// Fill sources array.
|
|
||||||
if (children.empty())
|
|
||||||
{
|
|
||||||
if (auto * source = dynamic_cast<ISourceWithProgress *>(node))
|
|
||||||
sources.push_back(source);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (size_t i = 0; i < is_visited.size(); ++i)
|
|
||||||
if (!is_visited[i])
|
|
||||||
throw Exception("Processor with name " + processors[i]->getName() +
|
|
||||||
" was not visited by traverse in TreeExecutorBlockInputStream.", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
void TreeExecutorBlockInputStream::init()
|
|
||||||
{
|
|
||||||
if (processors.empty())
|
|
||||||
throw Exception("No processors were passed to TreeExecutorBlockInputStream.", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
root = &output_port.getProcessor();
|
|
||||||
IProcessor * totals_root = nullptr;
|
|
||||||
IProcessor * extremes_root = nullptr;
|
|
||||||
|
|
||||||
if (totals_port)
|
|
||||||
totals_root = &totals_port->getProcessor();
|
|
||||||
|
|
||||||
if (extremes_port)
|
|
||||||
extremes_root = &extremes_port->getProcessor();
|
|
||||||
|
|
||||||
validateTree(processors, root, totals_root, extremes_root, sources_with_progress);
|
|
||||||
|
|
||||||
input_port = std::make_unique<InputPort>(getHeader(), root);
|
|
||||||
connect(output_port, *input_port);
|
|
||||||
input_port->setNeeded();
|
|
||||||
|
|
||||||
if (totals_port)
|
|
||||||
{
|
|
||||||
input_totals_port = std::make_unique<InputPort>(totals_port->getHeader(), root);
|
|
||||||
connect(*totals_port, *input_totals_port);
|
|
||||||
input_totals_port->setNeeded();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (extremes_port)
|
|
||||||
{
|
|
||||||
input_extremes_port = std::make_unique<InputPort>(extremes_port->getHeader(), root);
|
|
||||||
connect(*extremes_port, *input_extremes_port);
|
|
||||||
input_extremes_port->setNeeded();
|
|
||||||
}
|
|
||||||
|
|
||||||
initRowsBeforeLimit();
|
|
||||||
}
|
|
||||||
|
|
||||||
void TreeExecutorBlockInputStream::execute(bool on_totals, bool on_extremes)
|
|
||||||
{
|
|
||||||
std::stack<IProcessor *> stack;
|
|
||||||
|
|
||||||
if (on_totals)
|
|
||||||
stack.push(&totals_port->getProcessor());
|
|
||||||
else if (on_extremes)
|
|
||||||
stack.push(&extremes_port->getProcessor());
|
|
||||||
else
|
|
||||||
stack.push(root);
|
|
||||||
|
|
||||||
auto prepare_processor = [](IProcessor * processor)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
return processor->prepare();
|
|
||||||
}
|
|
||||||
catch (Exception & exception)
|
|
||||||
{
|
|
||||||
exception.addMessage(" While executing processor " + processor->getName());
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
while (!stack.empty() && !is_cancelled)
|
|
||||||
{
|
|
||||||
IProcessor * node = stack.top();
|
|
||||||
|
|
||||||
auto status = prepare_processor(node);
|
|
||||||
|
|
||||||
switch (status)
|
|
||||||
{
|
|
||||||
case IProcessor::Status::NeedData:
|
|
||||||
{
|
|
||||||
auto & inputs = node->getInputs();
|
|
||||||
|
|
||||||
if (inputs.empty())
|
|
||||||
throw Exception("Processors " + node->getName() + " with empty input "
|
|
||||||
"has returned NeedData in TreeExecutorBlockInputStream", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
bool all_finished = true;
|
|
||||||
|
|
||||||
for (auto & input : inputs)
|
|
||||||
{
|
|
||||||
if (input.isFinished())
|
|
||||||
continue;
|
|
||||||
|
|
||||||
all_finished = false;
|
|
||||||
|
|
||||||
stack.push(&input.getOutputPort().getProcessor());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (all_finished)
|
|
||||||
throw Exception("Processors " + node->getName() + " has returned NeedData in TreeExecutorBlockInputStream, "
|
|
||||||
"but all it's inputs are finished.", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case IProcessor::Status::PortFull:
|
|
||||||
case IProcessor::Status::Finished:
|
|
||||||
{
|
|
||||||
stack.pop();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case IProcessor::Status::Ready:
|
|
||||||
{
|
|
||||||
node->work();
|
|
||||||
|
|
||||||
/// This is handled inside PipelineExecutor now,
|
|
||||||
/// and doesn't checked by processors as in IInputStream before.
|
|
||||||
if (process_list_element && process_list_element->isKilled())
|
|
||||||
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case IProcessor::Status::Async:
|
|
||||||
case IProcessor::Status::Wait:
|
|
||||||
case IProcessor::Status::ExpandPipeline:
|
|
||||||
{
|
|
||||||
throw Exception("Processor with name " + node->getName() + " "
|
|
||||||
"returned status " + IProcessor::statusToName(status) + " "
|
|
||||||
"which is not supported in TreeExecutorBlockInputStream.", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void TreeExecutorBlockInputStream::initRowsBeforeLimit()
|
|
||||||
{
|
|
||||||
std::vector<LimitTransform *> limit_transforms;
|
|
||||||
std::vector<SourceFromInputStream *> sources;
|
|
||||||
|
|
||||||
struct StackEntry
|
|
||||||
{
|
|
||||||
IProcessor * processor;
|
|
||||||
bool visited_limit;
|
|
||||||
};
|
|
||||||
|
|
||||||
std::stack<StackEntry> stack;
|
|
||||||
stack.push({root, false});
|
|
||||||
|
|
||||||
while (!stack.empty())
|
|
||||||
{
|
|
||||||
auto * processor = stack.top().processor;
|
|
||||||
bool visited_limit = stack.top().visited_limit;
|
|
||||||
stack.pop();
|
|
||||||
|
|
||||||
if (!visited_limit)
|
|
||||||
{
|
|
||||||
|
|
||||||
if (auto * limit = typeid_cast<LimitTransform *>(processor))
|
|
||||||
{
|
|
||||||
visited_limit = true;
|
|
||||||
limit_transforms.emplace_back(limit);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (auto * source = typeid_cast<SourceFromInputStream *>(processor))
|
|
||||||
sources.emplace_back(source);
|
|
||||||
}
|
|
||||||
else if (auto * sorting = typeid_cast<PartialSortingTransform *>(processor))
|
|
||||||
{
|
|
||||||
if (!rows_before_limit_at_least)
|
|
||||||
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
|
|
||||||
|
|
||||||
sorting->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
|
||||||
|
|
||||||
/// Don't go to children. Take rows_before_limit from last PartialSortingTransform.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto & child_port : processor->getInputs())
|
|
||||||
{
|
|
||||||
auto * child_processor = &child_port.getOutputPort().getProcessor();
|
|
||||||
stack.push({child_processor, visited_limit});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!rows_before_limit_at_least && (!limit_transforms.empty() || !sources.empty()))
|
|
||||||
{
|
|
||||||
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
|
|
||||||
|
|
||||||
for (auto & limit : limit_transforms)
|
|
||||||
limit->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
|
||||||
|
|
||||||
for (auto & source : sources)
|
|
||||||
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// If there is a limit, then enable rows_before_limit_at_least
|
|
||||||
/// It is needed when zero rows is read, but we still want rows_before_limit_at_least in result.
|
|
||||||
if (!limit_transforms.empty())
|
|
||||||
rows_before_limit_at_least->add(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
Block TreeExecutorBlockInputStream::readImpl()
|
|
||||||
{
|
|
||||||
while (!is_cancelled)
|
|
||||||
{
|
|
||||||
if (input_port->isFinished())
|
|
||||||
{
|
|
||||||
if (totals_port && !input_totals_port->isFinished())
|
|
||||||
{
|
|
||||||
execute(true, false);
|
|
||||||
if (input_totals_port->hasData())
|
|
||||||
totals = getHeader().cloneWithColumns(input_totals_port->pull().detachColumns());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (extremes_port && !input_extremes_port->isFinished())
|
|
||||||
{
|
|
||||||
execute(false, true);
|
|
||||||
if (input_extremes_port->hasData())
|
|
||||||
extremes = getHeader().cloneWithColumns(input_extremes_port->pull().detachColumns());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (rows_before_limit_at_least && rows_before_limit_at_least->hasAppliedLimit())
|
|
||||||
info.setRowsBeforeLimit(rows_before_limit_at_least->get());
|
|
||||||
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
if (input_port->hasData())
|
|
||||||
{
|
|
||||||
auto chunk = input_port->pull();
|
|
||||||
Block block = getHeader().cloneWithColumns(chunk.detachColumns());
|
|
||||||
|
|
||||||
if (const auto & chunk_info = chunk.getChunkInfo())
|
|
||||||
{
|
|
||||||
if (const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(chunk_info.get()))
|
|
||||||
{
|
|
||||||
block.info.bucket_num = agg_info->bucket_num;
|
|
||||||
block.info.is_overflows = agg_info->is_overflows;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return block;
|
|
||||||
}
|
|
||||||
|
|
||||||
execute(false, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
void TreeExecutorBlockInputStream::setProgressCallback(const ProgressCallback & callback)
|
|
||||||
{
|
|
||||||
for (auto & source : sources_with_progress)
|
|
||||||
source->setProgressCallback(callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
void TreeExecutorBlockInputStream::setProcessListElement(QueryStatus * elem)
|
|
||||||
{
|
|
||||||
process_list_element = elem;
|
|
||||||
|
|
||||||
for (auto & source : sources_with_progress)
|
|
||||||
source->setProcessListElement(elem);
|
|
||||||
}
|
|
||||||
|
|
||||||
void TreeExecutorBlockInputStream::setLimits(const IBlockInputStream::LocalLimits & limits_)
|
|
||||||
{
|
|
||||||
for (auto & source : sources_with_progress)
|
|
||||||
source->setLimits(limits_);
|
|
||||||
}
|
|
||||||
|
|
||||||
void TreeExecutorBlockInputStream::setQuota(const std::shared_ptr<const EnabledQuota> & quota_)
|
|
||||||
{
|
|
||||||
for (auto & source : sources_with_progress)
|
|
||||||
source->setQuota(quota_);
|
|
||||||
}
|
|
||||||
|
|
||||||
void TreeExecutorBlockInputStream::addTotalRowsApprox(size_t value)
|
|
||||||
{
|
|
||||||
/// Add only for one source.
|
|
||||||
if (!sources_with_progress.empty())
|
|
||||||
sources_with_progress.front()->addTotalRowsApprox(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
void TreeExecutorBlockInputStream::cancel(bool kill)
|
|
||||||
{
|
|
||||||
IBlockInputStream::cancel(kill);
|
|
||||||
|
|
||||||
for (auto & processor : processors)
|
|
||||||
processor->cancel();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,82 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
#include <DataStreams/IBlockInputStream.h>
|
|
||||||
#include <Processors/Pipe.h>
|
|
||||||
#include <Processors/RowsBeforeLimitCounter.h>
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
class ISourceWithProgress;
|
|
||||||
|
|
||||||
/// It's a wrapper from processors tree-shaped pipeline to block input stream.
|
|
||||||
/// Execute all processors in a single thread, by in-order tree traverse.
|
|
||||||
/// Also, support for progress and quotas.
|
|
||||||
class TreeExecutorBlockInputStream : public IBlockInputStream
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
/// Last processor in list must be a tree root.
|
|
||||||
/// It is checked that
|
|
||||||
/// * processors form a tree
|
|
||||||
/// * all processors are attainable from root
|
|
||||||
/// * there is no other connected processors
|
|
||||||
explicit TreeExecutorBlockInputStream(Pipe pipe) : output_port(pipe.getPort())
|
|
||||||
{
|
|
||||||
for (auto & table_lock : pipe.getTableLocks())
|
|
||||||
addTableLock(table_lock);
|
|
||||||
|
|
||||||
for (auto & storage : pipe.getStorageHolders())
|
|
||||||
storage_holders.emplace_back(storage);
|
|
||||||
|
|
||||||
for (auto & context : pipe.getContexts())
|
|
||||||
interpreter_context.emplace_back(context);
|
|
||||||
|
|
||||||
totals_port = pipe.getTotalsPort();
|
|
||||||
extremes_port = pipe.getExtremesPort();
|
|
||||||
processors = std::move(pipe).detachProcessors();
|
|
||||||
init();
|
|
||||||
}
|
|
||||||
|
|
||||||
String getName() const override { return "TreeExecutor"; }
|
|
||||||
Block getHeader() const override { return root->getOutputs().front().getHeader(); }
|
|
||||||
|
|
||||||
void cancel(bool kill) override;
|
|
||||||
|
|
||||||
/// This methods does not affect TreeExecutor as IBlockInputStream itself.
|
|
||||||
/// They just passed to all SourceWithProgress processors.
|
|
||||||
void setProgressCallback(const ProgressCallback & callback) final;
|
|
||||||
void setProcessListElement(QueryStatus * elem) final;
|
|
||||||
void setLimits(const LocalLimits & limits_) final;
|
|
||||||
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) final;
|
|
||||||
void addTotalRowsApprox(size_t value) final;
|
|
||||||
|
|
||||||
protected:
|
|
||||||
Block readImpl() override;
|
|
||||||
|
|
||||||
private:
|
|
||||||
OutputPort & output_port;
|
|
||||||
OutputPort * totals_port = nullptr;
|
|
||||||
OutputPort * extremes_port = nullptr;
|
|
||||||
Processors processors;
|
|
||||||
IProcessor * root = nullptr;
|
|
||||||
std::unique_ptr<InputPort> input_port;
|
|
||||||
std::unique_ptr<InputPort> input_totals_port;
|
|
||||||
std::unique_ptr<InputPort> input_extremes_port;
|
|
||||||
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
|
|
||||||
|
|
||||||
/// Remember sources that support progress.
|
|
||||||
std::vector<ISourceWithProgress *> sources_with_progress;
|
|
||||||
|
|
||||||
QueryStatus * process_list_element = nullptr;
|
|
||||||
|
|
||||||
void init();
|
|
||||||
/// Execute tree step-by-step until root returns next chunk or execution is finished.
|
|
||||||
void execute(bool on_totals, bool on_extremes);
|
|
||||||
|
|
||||||
void initRowsBeforeLimit();
|
|
||||||
|
|
||||||
/// Moved from pipe.
|
|
||||||
std::vector<std::shared_ptr<Context>> interpreter_context;
|
|
||||||
std::vector<StoragePtr> storage_holders;
|
|
||||||
};
|
|
||||||
|
|
||||||
}
|
|
@ -231,13 +231,10 @@ void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter)
|
|||||||
auto & out_header = transform ? transform->getOutputs().front().getHeader()
|
auto & out_header = transform ? transform->getOutputs().front().getHeader()
|
||||||
: stream->getHeader();
|
: stream->getHeader();
|
||||||
|
|
||||||
if (stream_type != StreamType::Totals)
|
if (header)
|
||||||
{
|
assertBlocksHaveEqualStructure(header, out_header, "QueryPipeline");
|
||||||
if (header)
|
else
|
||||||
assertBlocksHaveEqualStructure(header, out_header, "QueryPipeline");
|
header = out_header;
|
||||||
else
|
|
||||||
header = out_header;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (transform)
|
if (transform)
|
||||||
{
|
{
|
||||||
@ -340,6 +337,12 @@ void QueryPipeline::addPipe(Processors pipe)
|
|||||||
header = output.getHeader();
|
header = output.getHeader();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (totals_having_port)
|
||||||
|
assertBlocksHaveEqualStructure(header, totals_having_port->getHeader(), "QueryPipeline");
|
||||||
|
|
||||||
|
if (extremes_port)
|
||||||
|
assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "QueryPipeline");
|
||||||
|
|
||||||
processors.emplace(pipe);
|
processors.emplace(pipe);
|
||||||
current_header = std::move(header);
|
current_header = std::move(header);
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,6 @@ SRCS(
|
|||||||
Executors/PipelineExecutor.cpp
|
Executors/PipelineExecutor.cpp
|
||||||
Executors/PullingAsyncPipelineExecutor.cpp
|
Executors/PullingAsyncPipelineExecutor.cpp
|
||||||
Executors/PullingPipelineExecutor.cpp
|
Executors/PullingPipelineExecutor.cpp
|
||||||
Executors/TreeExecutorBlockInputStream.cpp
|
|
||||||
ForkProcessor.cpp
|
ForkProcessor.cpp
|
||||||
Formats/IInputFormat.cpp
|
Formats/IInputFormat.cpp
|
||||||
Formats/Impl/BinaryRowInputFormat.cpp
|
Formats/Impl/BinaryRowInputFormat.cpp
|
||||||
|
@ -11,8 +11,6 @@
|
|||||||
#include <Common/quoteString.h>
|
#include <Common/quoteString.h>
|
||||||
#include <Interpreters/ExpressionActions.h>
|
#include <Interpreters/ExpressionActions.h>
|
||||||
|
|
||||||
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
@ -23,7 +23,7 @@
|
|||||||
#include <Processors/Merges/VersionedCollapsingTransform.h>
|
#include <Processors/Merges/VersionedCollapsingTransform.h>
|
||||||
#include <Processors/Transforms/ExpressionTransform.h>
|
#include <Processors/Transforms/ExpressionTransform.h>
|
||||||
#include <Processors/Transforms/MaterializingTransform.h>
|
#include <Processors/Transforms/MaterializingTransform.h>
|
||||||
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
|
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
|
||||||
#include <Interpreters/MutationsInterpreter.h>
|
#include <Interpreters/MutationsInterpreter.h>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <Common/SimpleIncrement.h>
|
#include <Common/SimpleIncrement.h>
|
||||||
@ -799,8 +799,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
Pipe merged_pipe(std::move(pipes), std::move(merged_transform));
|
QueryPipeline pipeline;
|
||||||
BlockInputStreamPtr merged_stream = std::make_shared<TreeExecutorBlockInputStream>(std::move(merged_pipe));
|
pipeline.init(Pipe(std::move(pipes), std::move(merged_transform)));
|
||||||
|
pipeline.setMaxThreads(1);
|
||||||
|
BlockInputStreamPtr merged_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
|
||||||
|
|
||||||
if (deduplicate)
|
if (deduplicate)
|
||||||
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, sort_description, SizeLimits(), 0 /*limit_hint*/, Names());
|
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, sort_description, SizeLimits(), 0 /*limit_hint*/, Names());
|
||||||
@ -915,8 +917,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
|||||||
column_part_source->setProgressCallback(
|
column_part_source->setProgressCallback(
|
||||||
MergeProgressCallback(merge_entry, watch_prev_elapsed, column_progress));
|
MergeProgressCallback(merge_entry, watch_prev_elapsed, column_progress));
|
||||||
|
|
||||||
column_part_streams[part_num] = std::make_shared<TreeExecutorBlockInputStream>(
|
QueryPipeline column_part_pipeline;
|
||||||
Pipe(std::move(column_part_source)));
|
column_part_pipeline.init(Pipe(std::move(column_part_source)));
|
||||||
|
column_part_pipeline.setMaxThreads(1);
|
||||||
|
|
||||||
|
column_part_streams[part_num] =
|
||||||
|
std::make_shared<PipelineExecutingBlockInputStream>(std::move(column_part_pipeline));
|
||||||
}
|
}
|
||||||
|
|
||||||
rows_sources_read_buf.seek(0, 0);
|
rows_sources_read_buf.seek(0, 0);
|
||||||
|
@ -44,7 +44,6 @@ namespace std
|
|||||||
#include <DataTypes/DataTypeEnum.h>
|
#include <DataTypes/DataTypeEnum.h>
|
||||||
#include <DataTypes/DataTypesNumber.h>
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
#include <Processors/ConcatProcessor.h>
|
#include <Processors/ConcatProcessor.h>
|
||||||
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
|
|
||||||
#include <Processors/Merges/AggregatingSortedTransform.h>
|
#include <Processors/Merges/AggregatingSortedTransform.h>
|
||||||
#include <Processors/Merges/CollapsingSortedTransform.h>
|
#include <Processors/Merges/CollapsingSortedTransform.h>
|
||||||
#include <Processors/Merges/MergingSortedTransform.h>
|
#include <Processors/Merges/MergingSortedTransform.h>
|
||||||
|
@ -6,7 +6,6 @@
|
|||||||
#include <Core/Defines.h>
|
#include <Core/Defines.h>
|
||||||
|
|
||||||
#include <ext/shared_ptr_helper.h>
|
#include <ext/shared_ptr_helper.h>
|
||||||
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
|
@ -14,7 +14,8 @@
|
|||||||
#include <Common/tests/gtest_global_context.h>
|
#include <Common/tests/gtest_global_context.h>
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
|
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
|
||||||
|
#include <Processors/QueryPipeline.h>
|
||||||
|
|
||||||
#if !__clang__
|
#if !__clang__
|
||||||
# pragma GCC diagnostic push
|
# pragma GCC diagnostic push
|
||||||
@ -115,8 +116,9 @@ std::string readData(DB::StoragePtr & table, const DB::Context & context)
|
|||||||
|
|
||||||
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
|
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
|
||||||
|
|
||||||
BlockInputStreamPtr in = std::make_shared<TreeExecutorBlockInputStream>(
|
QueryPipeline pipeline;
|
||||||
std::move(table->read(column_names, metadata_snapshot, {}, context, stage, 8192, 1)[0]));
|
pipeline.init(table->read(column_names, metadata_snapshot, {}, context, stage, 8192, 1));
|
||||||
|
BlockInputStreamPtr in = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
|
||||||
|
|
||||||
Block sample;
|
Block sample;
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user