Delay totals port for creating sets as well.

This commit is contained in:
Nikolai Kochetov 2023-11-30 18:23:05 +00:00
parent 30148972ed
commit 08f943462f
4 changed files with 110 additions and 122 deletions

View File

@ -111,6 +111,8 @@ QueryPipelineBuilderPtr CreatingSetsStep::updatePipeline(QueryPipelineBuilders p
else
delayed_pipeline = std::move(*pipelines.front());
delayed_pipeline.dropTotalsAndExtremes();
QueryPipelineProcessorsCollector collector(*main_pipeline, this);
main_pipeline->addPipelineBefore(std::move(delayed_pipeline));
auto added_processors = collector.detachProcessors();

View File

@ -434,68 +434,130 @@ void Pipe::addTransform(ProcessorPtr transform)
}
void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes)
{
addTransform(std::move(transform),
static_cast<InputPort *>(nullptr), static_cast<InputPort *>(nullptr),
totals, extremes);
}
void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes)
{
addTransform(std::move(transform),
totals, extremes,
static_cast<OutputPort *>(nullptr), static_cast<OutputPort *>(nullptr));
}
void Pipe::addTransform(
ProcessorPtr transform,
InputPort * totals_in, InputPort * extremes_in,
OutputPort * totals_out, OutputPort * extremes_out)
{
if (output_ports.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform to empty Pipe");
if (totals_in && !totals_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform consuming totals to Pipe because Pipe does not have totals");
if (extremes_in && !extremes_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform consuming extremes to Pipe because Pipe does not have extremes");
if (totals_out && !totals_in && totals_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform with totals to Pipe because it already has totals");
if (extremes_out && !extremes_in && extremes_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform with extremes to Pipe because it already has extremes");
auto & inputs = transform->getInputs();
if (inputs.size() != output_ports.size())
auto & outputs = transform->getOutputs();
size_t expected_inputs = output_ports.size() + (totals_in ? 1 : 0) + (extremes_in ? 1 : 0);
if (inputs.size() != expected_inputs)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipe because it has {} input ports, but {} expected",
transform->getName(),
inputs.size(),
output_ports.size());
expected_inputs);
if (totals && totals_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform with totals to Pipe because it already has totals");
if (outputs.size() <= (totals_out ? 1 : 0) + (extremes_out ? 1 : 0))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform {} to Pipes because it has no outputs",
transform->getName());
if (extremes && extremes_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform with extremes to Pipe because it already has extremes");
bool found_totals_in = false;
bool found_extremes_in = false;
if (totals)
totals_port = totals;
if (extremes)
extremes_port = extremes;
size_t next_output = 0;
for (auto & input : inputs)
{
connect(*output_ports[next_output], input);
++next_output;
if (&input == totals_in)
found_totals_in = true;
else if (&input == extremes_in)
found_extremes_in = true;
}
auto & outputs = transform->getOutputs();
if (totals_in && !found_totals_in)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipes because specified totals port does not belong to it",
transform->getName());
output_ports.clear();
output_ports.reserve(outputs.size());
if (extremes_in && !found_extremes_in)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipes because specified extremes port does not belong to it",
transform->getName());
bool found_totals = false;
bool found_extremes = false;
bool found_totals_out = false;
bool found_extremes_out = false;
for (auto & output : outputs)
{
if (&output == totals)
found_totals = true;
else if (&output == extremes)
found_extremes = true;
else
output_ports.emplace_back(&output);
if (&output == totals_out)
found_totals_out = true;
else if (&output == extremes_out)
found_extremes_out = true;
}
if (totals && !found_totals)
if (totals_out && !found_totals_out)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipes because specified totals port does not belong to it",
transform->getName());
if (extremes && !found_extremes)
if (extremes_out && !found_extremes_out)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipes because specified extremes port does not belong to it",
transform->getName());
if (output_ports.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform {} to Pipes because it has no outputs",
transform->getName());
if (totals_in)
{
connect(*totals_port, *totals_in);
totals_port = nullptr;
}
if (extremes_in)
{
connect(*extremes_port, *extremes_in);
extremes_port = nullptr;
}
totals_port = totals_out ? totals_out : totals_port;
extremes_port = extremes_out ? extremes_out : extremes_port;
size_t next_output = 0;
for (auto & input : inputs)
{
if (&input != totals_in && &input != extremes_in)
{
connect(*output_ports[next_output], input);
++next_output;
}
}
output_ports.clear();
output_ports.reserve(outputs.size());
for (auto & output : outputs)
{
if (&output != totals_out && &output != extremes_out)
output_ports.emplace_back(&output);
}
header = output_ports.front()->getHeader();
for (size_t i = 1; i < output_ports.size(); ++i)
@ -508,100 +570,11 @@ void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort
if (extremes_port)
assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes");
if (collected_processors)
collected_processors->emplace_back(transform);
processors->emplace_back(std::move(transform));
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes)
{
if (output_ports.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform to empty Pipe");
auto & inputs = transform->getInputs();
size_t expected_inputs = output_ports.size() + (totals ? 1 : 0) + (extremes ? 1 : 0);
if (inputs.size() != expected_inputs)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipe because it has {} input ports, but {} expected",
transform->getName(),
inputs.size(),
expected_inputs);
if (totals && !totals_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform consuming totals to Pipe because Pipe does not have totals");
if (extremes && !extremes_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform consuming extremes to Pipe because Pipe does not have extremes");
if (totals)
{
connect(*totals_port, *totals);
totals_port = nullptr;
}
if (extremes)
{
connect(*extremes_port, *extremes);
extremes_port = nullptr;
}
bool found_totals = false;
bool found_extremes = false;
size_t next_output = 0;
for (auto & input : inputs)
{
if (&input == totals)
found_totals = true;
else if (&input == extremes)
found_extremes = true;
else
{
connect(*output_ports[next_output], input);
++next_output;
}
}
if (totals && !found_totals)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipes because specified totals port does not belong to it",
transform->getName());
if (extremes && !found_extremes)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add transform {} to Pipes because specified extremes port does not belong to it",
transform->getName());
auto & outputs = transform->getOutputs();
if (outputs.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform {} to Pipes because it has no outputs", transform->getName());
output_ports.clear();
output_ports.reserve(outputs.size());
for (auto & output : outputs)
output_ports.emplace_back(&output);
header = output_ports.front()->getHeader();
for (size_t i = 1; i < output_ports.size(); ++i)
assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipes");
if (totals_port)
assertBlocksHaveEqualStructure(header, totals_port->getHeader(), "Pipes");
if (extremes_port)
assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes");
if (collected_processors)
collected_processors->emplace_back(transform);
processors->emplace_back(std::move(transform));
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}

View File

@ -69,6 +69,11 @@ public:
void addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes);
void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes);
void addTransform(
ProcessorPtr transform,
InputPort * totals_in, InputPort * extremes_in,
OutputPort * totals_out, OutputPort * extremes_out);
enum class StreamType
{
Main = 0, /// Stream for query data. There may be several streams of this type.

View File

@ -602,7 +602,9 @@ void QueryPipelineBuilder::addPipelineBefore(QueryPipelineBuilder pipeline)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for CreatingSets should have empty header. Got: {}",
pipeline.getHeader().dumpStructure());
IProcessor::PortNumbers delayed_streams(pipe.numOutputPorts());
bool has_totals = pipe.getTotalsPort();
bool has_extremes = pipe.getExtremesPort();
IProcessor::PortNumbers delayed_streams(pipe.numOutputPorts() + (has_totals ? 1 : 0) + (has_extremes ? 1 : 0));
for (size_t i = 0; i < delayed_streams.size(); ++i)
delayed_streams[i] = i;
@ -613,8 +615,14 @@ void QueryPipelineBuilder::addPipelineBefore(QueryPipelineBuilder pipeline)
pipes.emplace_back(QueryPipelineBuilder::getPipe(std::move(pipeline), resources));
pipe = Pipe::unitePipes(std::move(pipes), collected_processors, true);
auto processor = std::make_shared<DelayedPortsProcessor>(getHeader(), pipe.numOutputPorts(), delayed_streams, true);
addTransform(std::move(processor));
auto processor = std::make_shared<DelayedPortsProcessor>(getHeader(), delayed_streams.size(), delayed_streams, true);
auto in = processor->getInputs().begin();
auto out = processor->getOutputs().begin();
InputPort * totals_in = has_totals ? &*(in++) : nullptr;
InputPort * extremes_in = has_extremes ? &*(in++) : nullptr;
OutputPort * totals_out = has_totals ? &*(out++) : nullptr;
OutputPort * extremes_out = has_extremes ? &*(out++) : nullptr;
pipe.addTransform(std::move(processor), totals_in, extremes_in, totals_out, extremes_out);
}
void QueryPipelineBuilder::setProcessListElement(QueryStatusPtr elem)