ClickHouse/src/Processors/ConcatProcessor.cpp

65 lines
1.2 KiB
C++
Raw Normal View History

#include <Processors/ConcatProcessor.h>
namespace DB
{
ConcatProcessor::ConcatProcessor(const Block & header, size_t num_inputs)
: IProcessor(InputPorts(num_inputs, header), OutputPorts{header}), current_input(inputs.begin())
{
}
ConcatProcessor::Status ConcatProcessor::prepare()
{
auto & output = outputs.front();
2019-02-07 18:51:53 +00:00
/// Check can output.
2019-02-07 18:51:53 +00:00
if (output.isFinished())
{
2019-02-07 18:51:53 +00:00
for (; current_input != inputs.end(); ++current_input)
current_input->close();
2019-02-07 18:51:53 +00:00
return Status::Finished;
}
2019-02-07 18:51:53 +00:00
if (!output.isNeeded())
{
if (current_input != inputs.end())
current_input->setNotNeeded();
2019-02-07 18:51:53 +00:00
return Status::PortFull;
}
if (!output.canPush())
2019-02-07 18:51:53 +00:00
return Status::PortFull;
2019-02-07 18:51:53 +00:00
/// Check can input.
2020-04-20 11:03:29 +00:00
while (current_input != inputs.end() && current_input->isFinished())
2020-04-20 10:36:38 +00:00
++current_input;
2020-04-20 10:36:38 +00:00
if (current_input == inputs.end())
2019-02-07 18:51:53 +00:00
{
2020-04-20 10:36:38 +00:00
output.finish();
return Status::Finished;
}
2019-02-07 18:51:53 +00:00
auto & input = *current_input;
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
/// Move data.
output.push(input.pull());
/// Now, we pushed to output, and it must be full.
return Status::PortFull;
}
}