mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #10131 from ClickHouse/fix-extremes-for-processors
Fix extremes for processors
This commit is contained in:
commit
0782052768
@ -82,7 +82,16 @@ Pipe createLocalStream(const ASTPtr & query_ast, const Block & header, const Con
|
||||
/// This flag means that pipeline must be tree-shaped,
|
||||
/// so we can't enable processors for InterpreterSelectQuery here.
|
||||
auto stream = interpreter.execute().in;
|
||||
Pipe pipe(std::make_shared<SourceFromInputStream>(std::move(stream)));
|
||||
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
|
||||
|
||||
bool add_totals_and_extremes_port = processed_stage == QueryProcessingStage::Complete;
|
||||
if (add_totals_and_extremes_port)
|
||||
{
|
||||
source->addTotalsPort();
|
||||
source->addExtremesPort();
|
||||
}
|
||||
|
||||
Pipe pipe(std::move(source));
|
||||
|
||||
pipe.addSimpleTransform(std::make_shared<ConvertingTransform>(
|
||||
pipe.getHeader(), header, ConvertingTransform::MatchColumnsMode::Name, context));
|
||||
@ -130,7 +139,7 @@ void SelectStreamFactory::createForShard(
|
||||
Pipes & res)
|
||||
{
|
||||
bool force_add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
|
||||
bool add_totals_port = processed_stage == QueryProcessingStage::Complete;
|
||||
bool add_totals_and_extremes_port = processed_stage == QueryProcessingStage::Complete;
|
||||
|
||||
auto modified_query_ast = query_ast->clone();
|
||||
if (has_virtual_shard_num_column)
|
||||
@ -153,8 +162,11 @@ void SelectStreamFactory::createForShard(
|
||||
|
||||
auto source = std::make_shared<SourceFromInputStream>(std::move(stream), force_add_agg_info);
|
||||
|
||||
if (add_totals_port)
|
||||
if (add_totals_and_extremes_port)
|
||||
{
|
||||
source->addTotalsPort();
|
||||
source->addExtremesPort();
|
||||
}
|
||||
|
||||
res.emplace_back(std::move(source));
|
||||
};
|
||||
@ -303,8 +315,11 @@ void SelectStreamFactory::createForShard(
|
||||
auto lazy_stream = std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream);
|
||||
auto source = std::make_shared<SourceFromInputStream>(std::move(lazy_stream), force_add_agg_info);
|
||||
|
||||
if (add_totals_port)
|
||||
if (add_totals_and_extremes_port)
|
||||
{
|
||||
source->addTotalsPort();
|
||||
source->addExtremesPort();
|
||||
}
|
||||
|
||||
res.emplace_back(std::move(source));
|
||||
}
|
||||
|
@ -87,7 +87,6 @@
|
||||
#include <Processors/Transforms/MergingSortedTransform.h>
|
||||
#include <Processors/Transforms/DistinctTransform.h>
|
||||
#include <Processors/Transforms/LimitByTransform.h>
|
||||
#include <Processors/Transforms/ExtremesTransform.h>
|
||||
#include <Processors/Transforms/CreatingSetsTransform.h>
|
||||
#include <Processors/Transforms/RollupTransform.h>
|
||||
#include <Processors/Transforms/CubeTransform.h>
|
||||
@ -2542,8 +2541,7 @@ void InterpreterSelectQuery::executeExtremes(QueryPipeline & pipeline)
|
||||
if (!context->getSettingsRef().extremes)
|
||||
return;
|
||||
|
||||
auto transform = std::make_shared<ExtremesTransform>(pipeline.getHeader());
|
||||
pipeline.addExtremesTransform(std::move(transform));
|
||||
pipeline.addExtremesTransform();
|
||||
}
|
||||
|
||||
|
||||
|
@ -30,7 +30,10 @@ static void checkProcessorHasSingleOutput(IProcessor * processor)
|
||||
|
||||
/// Check tree invariants (described in TreeExecutor.h).
|
||||
/// Collect sources with progress.
|
||||
static void validateTree(const Processors & processors, IProcessor * root, IProcessor * totals_root, std::vector<ISourceWithProgress *> & sources)
|
||||
static void validateTree(
|
||||
const Processors & processors,
|
||||
IProcessor * root, IProcessor * totals_root, IProcessor * extremes_root,
|
||||
std::vector<ISourceWithProgress *> & sources)
|
||||
{
|
||||
std::unordered_map<IProcessor *, size_t> index;
|
||||
|
||||
@ -49,6 +52,8 @@ static void validateTree(const Processors & processors, IProcessor * root, IProc
|
||||
stack.push(root);
|
||||
if (totals_root)
|
||||
stack.push(totals_root);
|
||||
if (extremes_root)
|
||||
stack.push(extremes_root);
|
||||
|
||||
while (!stack.empty())
|
||||
{
|
||||
@ -104,11 +109,15 @@ void TreeExecutorBlockInputStream::init()
|
||||
|
||||
root = &output_port.getProcessor();
|
||||
IProcessor * totals_root = nullptr;
|
||||
IProcessor * extremes_root = nullptr;
|
||||
|
||||
if (totals_port)
|
||||
totals_root = &totals_port->getProcessor();
|
||||
|
||||
validateTree(processors, root, totals_root, sources_with_progress);
|
||||
if (extremes_port)
|
||||
extremes_root = &extremes_port->getProcessor();
|
||||
|
||||
validateTree(processors, root, totals_root, extremes_root, sources_with_progress);
|
||||
|
||||
input_port = std::make_unique<InputPort>(getHeader(), root);
|
||||
connect(output_port, *input_port);
|
||||
@ -121,15 +130,24 @@ void TreeExecutorBlockInputStream::init()
|
||||
input_totals_port->setNeeded();
|
||||
}
|
||||
|
||||
if (extremes_port)
|
||||
{
|
||||
input_extremes_port = std::make_unique<InputPort>(extremes_port->getHeader(), root);
|
||||
connect(*extremes_port, *input_extremes_port);
|
||||
input_extremes_port->setNeeded();
|
||||
}
|
||||
|
||||
initRowsBeforeLimit();
|
||||
}
|
||||
|
||||
void TreeExecutorBlockInputStream::execute(bool on_totals)
|
||||
void TreeExecutorBlockInputStream::execute(bool on_totals, bool on_extremes)
|
||||
{
|
||||
std::stack<IProcessor *> stack;
|
||||
|
||||
if (on_totals)
|
||||
stack.push(&totals_port->getProcessor());
|
||||
else if (on_extremes)
|
||||
stack.push(&extremes_port->getProcessor());
|
||||
else
|
||||
stack.push(root);
|
||||
|
||||
@ -283,11 +301,18 @@ Block TreeExecutorBlockInputStream::readImpl()
|
||||
{
|
||||
if (totals_port && !input_totals_port->isFinished())
|
||||
{
|
||||
execute(true);
|
||||
execute(true, false);
|
||||
if (input_totals_port->hasData())
|
||||
totals = getHeader().cloneWithColumns(input_totals_port->pull().detachColumns());
|
||||
}
|
||||
|
||||
if (extremes_port && !input_extremes_port->isFinished())
|
||||
{
|
||||
execute(false, true);
|
||||
if (input_extremes_port->hasData())
|
||||
extremes = getHeader().cloneWithColumns(input_extremes_port->pull().detachColumns());
|
||||
}
|
||||
|
||||
if (rows_before_limit_at_least && rows_before_limit_at_least->hasAppliedLimit())
|
||||
info.setRowsBeforeLimit(rows_before_limit_at_least->get());
|
||||
|
||||
@ -311,7 +336,7 @@ Block TreeExecutorBlockInputStream::readImpl()
|
||||
return block;
|
||||
}
|
||||
|
||||
execute(false);
|
||||
execute(false, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -31,6 +31,7 @@ public:
|
||||
interpreter_context.emplace_back(context);
|
||||
|
||||
totals_port = pipe.getTotalsPort();
|
||||
extremes_port = pipe.getExtremesPort();
|
||||
processors = std::move(pipe).detachProcessors();
|
||||
init();
|
||||
}
|
||||
@ -52,10 +53,12 @@ protected:
|
||||
private:
|
||||
OutputPort & output_port;
|
||||
OutputPort * totals_port = nullptr;
|
||||
OutputPort * extremes_port = nullptr;
|
||||
Processors processors;
|
||||
IProcessor * root = nullptr;
|
||||
std::unique_ptr<InputPort> input_port;
|
||||
std::unique_ptr<InputPort> input_totals_port;
|
||||
std::unique_ptr<InputPort> input_extremes_port;
|
||||
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
|
||||
|
||||
/// Remember sources that support progress.
|
||||
@ -65,7 +68,7 @@ private:
|
||||
|
||||
void init();
|
||||
/// Execute tree step-by-step until root returns next chunk or execution is finished.
|
||||
void execute(bool on_totals);
|
||||
void execute(bool on_totals, bool on_extremes);
|
||||
|
||||
void initRowsBeforeLimit();
|
||||
|
||||
|
@ -225,7 +225,7 @@ void PrettyBlockOutputFormat::consumeTotals(Chunk chunk)
|
||||
{
|
||||
total_rows = 0;
|
||||
writeSuffixIfNot();
|
||||
writeCString("\nExtremes:\n", out);
|
||||
writeCString("\nTotals:\n", out);
|
||||
write(chunk, PortKind::Totals);
|
||||
}
|
||||
|
||||
@ -233,7 +233,7 @@ void PrettyBlockOutputFormat::consumeExtremes(Chunk chunk)
|
||||
{
|
||||
total_rows = 0;
|
||||
writeSuffixIfNot();
|
||||
writeCString("\nTotals:\n", out);
|
||||
writeCString("\nExtremes:\n", out);
|
||||
write(chunk, PortKind::Extremes);
|
||||
}
|
||||
|
||||
|
@ -1,9 +1,11 @@
|
||||
#pragma once
|
||||
#include <Processors/IProcessor.h>
|
||||
#include <Processors/ISink.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Sink which closes input port and reads nothing.
|
||||
class NullSink : public IProcessor
|
||||
{
|
||||
public:
|
||||
@ -19,4 +21,15 @@ public:
|
||||
InputPort & getPort() { return inputs.front(); }
|
||||
};
|
||||
|
||||
/// Sink which reads everything and do nothing with it.
|
||||
class EmptySink : public ISink
|
||||
{
|
||||
public:
|
||||
explicit EmptySink(Block header) : ISink(std::move(header)) {}
|
||||
String getName() const override { return "EmptySink"; }
|
||||
|
||||
protected:
|
||||
void consume(Chunk) override {}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include <Processors/Pipe.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Processors/Sources/SourceFromInputStream.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -48,7 +49,7 @@ static void checkSource(const IProcessor & source)
|
||||
throw Exception("Source for pipe should have single output, but it doesn't have any",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (source.getOutputs().size() > 2)
|
||||
if (source.getOutputs().size() > 1)
|
||||
throw Exception("Source for pipe should have single or two outputs, but " + source.getName() + " has " +
|
||||
toString(source.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
@ -56,18 +57,22 @@ static void checkSource(const IProcessor & source)
|
||||
|
||||
Pipe::Pipe(ProcessorPtr source)
|
||||
{
|
||||
checkSource(*source);
|
||||
output_port = &source->getOutputs().front();
|
||||
if (auto * source_from_input_stream = typeid_cast<SourceFromInputStream *>(source.get()))
|
||||
{
|
||||
totals = source_from_input_stream->getTotalsPort();
|
||||
extremes = source_from_input_stream->getExtremesPort();
|
||||
}
|
||||
else if (source->getOutputs().size() != 1)
|
||||
checkSource(*source);
|
||||
|
||||
if (source->getOutputs().size() > 1)
|
||||
totals = &source->getOutputs().back();
|
||||
output_port = &source->getOutputs().front();
|
||||
|
||||
processors.emplace_back(std::move(source));
|
||||
max_parallel_streams = 1;
|
||||
}
|
||||
|
||||
Pipe::Pipe(Processors processors_, OutputPort * output_port_, OutputPort * totals_)
|
||||
: processors(std::move(processors_)), output_port(output_port_), totals(totals_)
|
||||
Pipe::Pipe(Processors processors_, OutputPort * output_port_, OutputPort * totals_, OutputPort * extremes_)
|
||||
: processors(std::move(processors_)), output_port(output_port_), totals(totals_), extremes(extremes_)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -47,8 +47,11 @@ public:
|
||||
|
||||
void enableQuota();
|
||||
|
||||
/// Totals and extremes port.
|
||||
void setTotalsPort(OutputPort * totals_) { totals = totals_; }
|
||||
void setExtremesPort(OutputPort * extremes_) { extremes = extremes_; }
|
||||
OutputPort * getTotalsPort() const { return totals; }
|
||||
OutputPort * getExtremesPort() const { return extremes; }
|
||||
|
||||
size_t maxParallelStreams() const { return max_parallel_streams; }
|
||||
|
||||
@ -67,6 +70,7 @@ private:
|
||||
Processors processors;
|
||||
OutputPort * output_port = nullptr;
|
||||
OutputPort * totals = nullptr;
|
||||
OutputPort * extremes = nullptr;
|
||||
|
||||
/// It is the max number of processors which can be executed in parallel for each step. See QueryPipeline::Streams.
|
||||
size_t max_parallel_streams = 0;
|
||||
@ -84,7 +88,7 @@ private:
|
||||
/// and therefore we can skip those checks.
|
||||
/// Note that Pipe represents a tree if it was created using public interface. But this constructor can't assert it.
|
||||
/// So, it's possible that TreeExecutorBlockInputStream could be unable to convert such Pipe to IBlockInputStream.
|
||||
explicit Pipe(Processors processors_, OutputPort * output_port, OutputPort * totals);
|
||||
explicit Pipe(Processors processors_, OutputPort * output_port, OutputPort * totals, OutputPort * extremes);
|
||||
|
||||
friend class QueryPipeline;
|
||||
};
|
||||
|
@ -60,6 +60,58 @@ void QueryPipeline::init(Pipe pipe)
|
||||
init(std::move(pipes));
|
||||
}
|
||||
|
||||
static OutputPort * uniteExtremes(const std::vector<OutputPort *> & ports, const Block & header, Processors & processors)
|
||||
{
|
||||
/// Here we calculate extremes for extremes in case we unite several pipelines.
|
||||
/// Example: select number from numbers(2) union all select number from numbers(3)
|
||||
|
||||
/// ->> Resize -> Extremes --(output port)----> Null
|
||||
/// --(extremes port)--> ...
|
||||
|
||||
auto resize = std::make_shared<ResizeProcessor>(header, ports.size(), 1);
|
||||
auto extremes = std::make_shared<ExtremesTransform>(header);
|
||||
auto sink = std::make_shared<EmptySink>(header);
|
||||
|
||||
auto * extremes_port = &extremes->getExtremesPort();
|
||||
|
||||
auto in = resize->getInputs().begin();
|
||||
for (auto & port : ports)
|
||||
connect(*port, *(in++));
|
||||
|
||||
connect(resize->getOutputs().front(), extremes->getInputPort());
|
||||
connect(extremes->getOutputPort(), sink->getPort());
|
||||
|
||||
processors.emplace_back(std::move(resize));
|
||||
processors.emplace_back(std::move(extremes));
|
||||
processors.emplace_back(std::move(sink));
|
||||
|
||||
return extremes_port;
|
||||
}
|
||||
|
||||
static OutputPort * uniteTotals(const std::vector<OutputPort *> & ports, const Block & header, Processors & processors)
|
||||
{
|
||||
/// Calculate totals fro several streams.
|
||||
/// Take totals from first sources which has any, skip others.
|
||||
|
||||
/// ->> Concat -> Limit
|
||||
|
||||
auto concat = std::make_shared<ConcatProcessor>(header, ports.size());
|
||||
auto limit = std::make_shared<LimitTransform>(header, 1, 0);
|
||||
|
||||
auto * totals_port = &limit->getOutputPort();
|
||||
|
||||
auto in = concat->getInputs().begin();
|
||||
for (auto & port : ports)
|
||||
connect(*port, *(in++));
|
||||
|
||||
connect(concat->getOutputs().front(), limit->getInputPort());
|
||||
|
||||
processors.emplace_back(std::move(concat));
|
||||
processors.emplace_back(std::move(limit));
|
||||
|
||||
return totals_port;
|
||||
}
|
||||
|
||||
void QueryPipeline::init(Pipes pipes)
|
||||
{
|
||||
if (initialized())
|
||||
@ -82,6 +134,7 @@ void QueryPipeline::init(Pipes pipes)
|
||||
}
|
||||
|
||||
std::vector<OutputPort *> totals;
|
||||
std::vector<OutputPort *> extremes;
|
||||
|
||||
for (auto & pipe : pipes)
|
||||
{
|
||||
@ -98,6 +151,12 @@ void QueryPipeline::init(Pipes pipes)
|
||||
totals.emplace_back(totals_port);
|
||||
}
|
||||
|
||||
if (auto * port = pipe.getExtremesPort())
|
||||
{
|
||||
assertBlocksHaveEqualStructure(current_header, port->getHeader(), "QueryPipeline");
|
||||
extremes.emplace_back(port);
|
||||
}
|
||||
|
||||
streams.addStream(&pipe.getPort(), pipe.maxParallelStreams());
|
||||
auto cur_processors = std::move(pipe).detachProcessors();
|
||||
processors.insert(processors.end(), cur_processors.begin(), cur_processors.end());
|
||||
@ -108,15 +167,15 @@ void QueryPipeline::init(Pipes pipes)
|
||||
if (totals.size() == 1)
|
||||
totals_having_port = totals.back();
|
||||
else
|
||||
{
|
||||
auto resize = std::make_shared<ResizeProcessor>(current_header, totals.size(), 1);
|
||||
auto in = resize->getInputs().begin();
|
||||
for (auto & total : totals)
|
||||
connect(*total, *(in++));
|
||||
totals_having_port = uniteTotals(totals, current_header, processors);
|
||||
}
|
||||
|
||||
totals_having_port = &resize->getOutputs().front();
|
||||
processors.emplace_back(std::move(resize));
|
||||
}
|
||||
if (!extremes.empty())
|
||||
{
|
||||
if (extremes.size() == 1)
|
||||
extremes_port = extremes.back();
|
||||
else
|
||||
extremes_port = uniteExtremes(extremes, current_header, processors);
|
||||
}
|
||||
}
|
||||
|
||||
@ -356,29 +415,31 @@ void QueryPipeline::dropTotalsIfHas()
|
||||
}
|
||||
}
|
||||
|
||||
void QueryPipeline::addExtremesTransform(ProcessorPtr transform)
|
||||
void QueryPipeline::addExtremesTransform()
|
||||
{
|
||||
checkInitialized();
|
||||
|
||||
if (!typeid_cast<const ExtremesTransform *>(transform.get()))
|
||||
throw Exception("ExtremesTransform expected for QueryPipeline::addExtremesTransform.",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (extremes_port)
|
||||
throw Exception("Extremes transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (getNumStreams() != 1)
|
||||
throw Exception("Cant't add Extremes transform because pipeline is expected to have single stream, "
|
||||
"but it has " + toString(getNumStreams()) + " streams.", ErrorCodes::LOGICAL_ERROR);
|
||||
std::vector<OutputPort *> extremes;
|
||||
extremes.reserve(streams.size());
|
||||
|
||||
connect(*streams.front(), transform->getInputs().front());
|
||||
for (auto & stream : streams)
|
||||
{
|
||||
auto transform = std::make_shared<ExtremesTransform>(current_header);
|
||||
connect(*stream, transform->getInputPort());
|
||||
|
||||
auto & outputs = transform->getOutputs();
|
||||
stream = &transform->getOutputPort();
|
||||
extremes.push_back(&transform->getExtremesPort());
|
||||
|
||||
streams.assign({ &outputs.front() });
|
||||
extremes_port = &outputs.back();
|
||||
current_header = outputs.front().getHeader();
|
||||
processors.emplace_back(std::move(transform));
|
||||
processors.emplace_back(std::move(transform));
|
||||
}
|
||||
|
||||
if (extremes.size() == 1)
|
||||
extremes_port = extremes.front();
|
||||
else
|
||||
extremes_port = uniteExtremes(extremes, current_header, processors);
|
||||
}
|
||||
|
||||
void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform)
|
||||
@ -455,6 +516,13 @@ void QueryPipeline::unitePipelines(
|
||||
});
|
||||
|
||||
std::vector<OutputPort *> extremes;
|
||||
std::vector<OutputPort *> totals;
|
||||
|
||||
if (extremes_port)
|
||||
extremes.push_back(extremes_port);
|
||||
|
||||
if (totals_having_port)
|
||||
totals.push_back(totals_having_port);
|
||||
|
||||
for (auto & pipeline : pipelines)
|
||||
{
|
||||
@ -479,17 +547,12 @@ void QueryPipeline::unitePipelines(
|
||||
/// Take totals only from first port.
|
||||
if (pipeline.totals_having_port)
|
||||
{
|
||||
if (!totals_having_port)
|
||||
{
|
||||
auto converting = std::make_shared<ConvertingTransform>(
|
||||
pipeline.current_header, common_header, ConvertingTransform::MatchColumnsMode::Position, context);
|
||||
auto converting = std::make_shared<ConvertingTransform>(
|
||||
pipeline.current_header, common_header, ConvertingTransform::MatchColumnsMode::Position, context);
|
||||
|
||||
connect(*pipeline.totals_having_port, converting->getInputPort());
|
||||
totals_having_port = &converting->getOutputPort();
|
||||
processors.push_back(std::move(converting));
|
||||
}
|
||||
else
|
||||
pipeline.dropTotalsIfHas();
|
||||
connect(*pipeline.totals_having_port, converting->getInputPort());
|
||||
totals.push_back(&converting->getOutputPort());
|
||||
processors.push_back(std::move(converting));
|
||||
}
|
||||
|
||||
processors.insert(processors.end(), pipeline.processors.begin(), pipeline.processors.end());
|
||||
@ -504,28 +567,18 @@ void QueryPipeline::unitePipelines(
|
||||
|
||||
if (!extremes.empty())
|
||||
{
|
||||
size_t num_inputs = extremes.size() + (extremes_port ? 1u : 0u);
|
||||
|
||||
if (num_inputs == 1)
|
||||
extremes_port = extremes.front();
|
||||
if (extremes.size() == 1)
|
||||
extremes_port = extremes.back();
|
||||
else
|
||||
{
|
||||
/// Add extra processor for extremes.
|
||||
auto resize = std::make_shared<ResizeProcessor>(current_header, num_inputs, 1);
|
||||
auto input = resize->getInputs().begin();
|
||||
extremes_port = uniteExtremes(extremes, current_header, processors);
|
||||
}
|
||||
|
||||
if (extremes_port)
|
||||
connect(*extremes_port, *(input++));
|
||||
|
||||
for (auto & output : extremes)
|
||||
connect(*output, *(input++));
|
||||
|
||||
auto transform = std::make_shared<ExtremesTransform>(current_header);
|
||||
extremes_port = &transform->getOutputPort();
|
||||
|
||||
connect(resize->getOutputs().front(), transform->getInputPort());
|
||||
processors.emplace_back(std::move(transform));
|
||||
}
|
||||
if (!totals.empty())
|
||||
{
|
||||
if (totals.size() == 1)
|
||||
totals_having_port = totals.back();
|
||||
else
|
||||
totals_having_port = uniteTotals(totals, current_header, processors);
|
||||
}
|
||||
}
|
||||
|
||||
@ -644,7 +697,7 @@ void QueryPipeline::initRowsBeforeLimit()
|
||||
Pipe QueryPipeline::getPipe() &&
|
||||
{
|
||||
resize(1);
|
||||
Pipe pipe(std::move(processors), streams.at(0), totals_having_port);
|
||||
Pipe pipe(std::move(processors), streams.at(0), totals_having_port, extremes_port);
|
||||
pipe.max_parallel_streams = streams.maxParallelStreams();
|
||||
|
||||
for (auto & lock : table_locks)
|
||||
@ -659,6 +712,9 @@ Pipe QueryPipeline::getPipe() &&
|
||||
if (totals_having_port)
|
||||
pipe.setTotalsPort(totals_having_port);
|
||||
|
||||
if (extremes_port)
|
||||
pipe.setExtremesPort(extremes_port);
|
||||
|
||||
return pipe;
|
||||
}
|
||||
|
||||
|
@ -99,7 +99,7 @@ public:
|
||||
void addSimpleTransform(const ProcessorGetterWithStreamKind & getter);
|
||||
void addPipe(Processors pipe);
|
||||
void addTotalsHavingTransform(ProcessorPtr transform);
|
||||
void addExtremesTransform(ProcessorPtr transform);
|
||||
void addExtremesTransform();
|
||||
void addCreatingSetsTransform(ProcessorPtr transform);
|
||||
void setOutput(ProcessorPtr output);
|
||||
|
||||
|
@ -28,11 +28,20 @@ void SourceFromInputStream::init()
|
||||
|
||||
void SourceFromInputStream::addTotalsPort()
|
||||
{
|
||||
if (has_totals_port)
|
||||
if (totals_port)
|
||||
throw Exception("Totals port was already added for SourceFromInputStream.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
outputs.emplace_back(outputs.front().getHeader(), this);
|
||||
has_totals_port = true;
|
||||
totals_port = &outputs.back();
|
||||
}
|
||||
|
||||
void SourceFromInputStream::addExtremesPort()
|
||||
{
|
||||
if (extremes_port)
|
||||
throw Exception("Extremes port was already added for SourceFromInputStream.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
outputs.emplace_back(outputs.front().getHeader(), this);
|
||||
extremes_port = &outputs.back();
|
||||
}
|
||||
|
||||
IProcessor::Status SourceFromInputStream::prepare()
|
||||
@ -47,23 +56,32 @@ IProcessor::Status SourceFromInputStream::prepare()
|
||||
if (!is_stream_finished && !isCancelled())
|
||||
return Status::Ready;
|
||||
|
||||
if (has_totals_port)
|
||||
if (totals_port && !totals_port->isFinished())
|
||||
{
|
||||
auto & totals_out = outputs.back();
|
||||
|
||||
if (totals_out.isFinished())
|
||||
return Status::Finished;
|
||||
|
||||
if (has_totals)
|
||||
{
|
||||
if (!totals_out.canPush())
|
||||
if (!totals_port->canPush())
|
||||
return Status::PortFull;
|
||||
|
||||
totals_out.push(std::move(totals));
|
||||
totals_port->push(std::move(totals));
|
||||
has_totals = false;
|
||||
}
|
||||
|
||||
totals_out.finish();
|
||||
totals_port->finish();
|
||||
}
|
||||
|
||||
if (extremes_port && !extremes_port->isFinished())
|
||||
{
|
||||
if (has_extremes)
|
||||
{
|
||||
if (!extremes_port->canPush())
|
||||
return Status::PortFull;
|
||||
|
||||
extremes_port->push(std::move(extremes));
|
||||
has_extremes = false;
|
||||
}
|
||||
|
||||
extremes_port->finish();
|
||||
}
|
||||
}
|
||||
|
||||
@ -138,13 +156,22 @@ Chunk SourceFromInputStream::generate()
|
||||
|
||||
if (auto totals_block = stream->getTotals())
|
||||
{
|
||||
if (totals_block.rows() == 1) /// Sometimes we can get empty totals. Skip it.
|
||||
if (totals_block.rows() > 0) /// Sometimes we can get empty totals. Skip it.
|
||||
{
|
||||
totals.setColumns(totals_block.getColumns(), 1);
|
||||
totals.setColumns(totals_block.getColumns(), totals_block.rows());
|
||||
has_totals = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (auto extremes_block = stream->getExtremes())
|
||||
{
|
||||
if (extremes_block.rows() > 0) /// Sometimes we can get empty extremes. Skip it.
|
||||
{
|
||||
extremes.setColumns(extremes_block.getColumns(), extremes_block.rows());
|
||||
has_extremes = true;
|
||||
}
|
||||
}
|
||||
|
||||
is_stream_finished = true;
|
||||
return {};
|
||||
}
|
||||
|
@ -23,6 +23,10 @@ public:
|
||||
BlockInputStreamPtr & getStream() { return stream; }
|
||||
|
||||
void addTotalsPort();
|
||||
void addExtremesPort();
|
||||
|
||||
OutputPort * getTotalsPort() const { return totals_port; }
|
||||
OutputPort * getExtremesPort() const { return extremes_port; }
|
||||
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit.swap(counter); }
|
||||
|
||||
@ -44,9 +48,13 @@ private:
|
||||
RowsBeforeLimitCounterPtr rows_before_limit;
|
||||
|
||||
Chunk totals;
|
||||
bool has_totals_port = false;
|
||||
OutputPort * totals_port = nullptr;
|
||||
bool has_totals = false;
|
||||
|
||||
Chunk extremes;
|
||||
OutputPort * extremes_port = nullptr;
|
||||
bool has_extremes = false;
|
||||
|
||||
bool is_generating_finished = false;
|
||||
bool is_stream_finished = false;
|
||||
bool is_stream_started = false;
|
||||
|
110
tests/queries/0_stateless/01232_extremes.reference
Normal file
110
tests/queries/0_stateless/01232_extremes.reference
Normal file
@ -0,0 +1,110 @@
|
||||
0
|
||||
1
|
||||
|
||||
0
|
||||
1
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
0
|
||||
1
|
||||
0
|
||||
1
|
||||
|
||||
0
|
||||
1
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
0
|
||||
1
|
||||
0
|
||||
1
|
||||
|
||||
0
|
||||
1
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
0
|
||||
1
|
||||
0
|
||||
1
|
||||
|
||||
0
|
||||
1
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
0
|
||||
1
|
||||
|
||||
0
|
||||
1
|
||||
------
|
||||
|
||||
------
|
||||
------
|
||||
0
|
||||
0
|
||||
0
|
||||
1
|
||||
1
|
||||
2
|
||||
|
||||
0
|
||||
2
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
0
|
||||
0
|
||||
0
|
||||
1
|
||||
1
|
||||
2
|
||||
|
||||
0
|
||||
2
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
0
|
||||
0
|
||||
0
|
||||
1
|
||||
1
|
||||
2
|
||||
|
||||
0
|
||||
2
|
||||
------
|
||||
|
||||
------
|
||||
------
|
||||
0
|
||||
0
|
||||
1
|
||||
1
|
||||
2
|
||||
|
||||
0
|
||||
2
|
||||
-
|
||||
|
||||
-
|
||||
-
|
||||
0
|
||||
0
|
||||
1
|
||||
1
|
||||
2
|
||||
|
||||
0
|
||||
2
|
55
tests/queries/0_stateless/01232_extremes.sql
Normal file
55
tests/queries/0_stateless/01232_extremes.sql
Normal file
@ -0,0 +1,55 @@
|
||||
set send_logs_level = 'error';
|
||||
set extremes = 1;
|
||||
-- set experimental_use_processors=0;
|
||||
|
||||
select * from remote('127.0.0.1', numbers(2));
|
||||
select '-';
|
||||
select * from remote('127.0.0.{1,1}', numbers(2));
|
||||
select '-';
|
||||
select * from remote('127.0.0.{1,2}', numbers(2));
|
||||
select '-';
|
||||
select * from remote('127.0.0.{2,2}', numbers(2));
|
||||
select '-';
|
||||
select * from remote('127.0.0.2', numbers(2));
|
||||
select '------';
|
||||
|
||||
select * from (select * from numbers(2) union all select * from numbers(3) union all select * from numbers(1)) order by number;
|
||||
select '-';
|
||||
select * from (select * from numbers(1) union all select * from numbers(2) union all select * from numbers(3)) order by number;
|
||||
select '-';
|
||||
select * from (select * from numbers(3) union all select * from numbers(1) union all select * from numbers(2)) order by number;
|
||||
select '------';
|
||||
|
||||
create database if not exists shard_0;
|
||||
create database if not exists shard_1;
|
||||
|
||||
drop table if exists shard_0.num_01232;
|
||||
drop table if exists shard_0.num2_01232;
|
||||
drop table if exists shard_1.num_01232;
|
||||
drop table if exists shard_1.num2_01232;
|
||||
drop table if exists distr;
|
||||
drop table if exists distr2;
|
||||
|
||||
create table shard_0.num_01232 (number UInt64) engine = MergeTree order by number;
|
||||
create table shard_1.num_01232 (number UInt64) engine = MergeTree order by number;
|
||||
insert into shard_0.num_01232 select number from numbers(2);
|
||||
insert into shard_1.num_01232 select number from numbers(3);
|
||||
create table distr (number UInt64) engine = Distributed(test_cluster_two_shards_different_databases, '', num_01232);
|
||||
|
||||
create table shard_0.num2_01232 (number UInt64) engine = MergeTree order by number;
|
||||
create table shard_1.num2_01232 (number UInt64) engine = MergeTree order by number;
|
||||
insert into shard_0.num2_01232 select number from numbers(3);
|
||||
insert into shard_1.num2_01232 select number from numbers(2);
|
||||
create table distr2 (number UInt64) engine = Distributed(test_cluster_two_shards_different_databases, '', num2_01232);
|
||||
|
||||
select * from distr order by number;
|
||||
select '-';
|
||||
select * from distr2 order by number;
|
||||
|
||||
drop table if exists shard_0.num_01232;
|
||||
drop table if exists shard_0.num2_01232;
|
||||
drop table if exists shard_1.num_01232;
|
||||
drop table if exists shard_1.num2_01232;
|
||||
drop table if exists distr;
|
||||
drop table if exists distr2;
|
||||
|
Loading…
Reference in New Issue
Block a user