2018-05-24 02:39:22 +00:00
|
|
|
#include <Processors/ForkProcessor.h>
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
ForkProcessor::Status ForkProcessor::prepare()
|
|
|
|
{
|
2019-02-27 11:24:14 +00:00
|
|
|
auto & input = inputs.front();
|
2018-05-24 02:39:22 +00:00
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Check can output.
|
|
|
|
|
|
|
|
bool all_finished = true;
|
|
|
|
bool all_can_push = true;
|
2019-06-18 08:25:27 +00:00
|
|
|
size_t num_active_outputs = 0;
|
2018-05-24 02:39:22 +00:00
|
|
|
|
|
|
|
for (const auto & output : outputs)
|
|
|
|
{
|
2019-02-07 18:51:53 +00:00
|
|
|
if (!output.isFinished())
|
2018-05-24 02:39:22 +00:00
|
|
|
{
|
2019-02-07 18:51:53 +00:00
|
|
|
all_finished = false;
|
2019-06-18 08:25:27 +00:00
|
|
|
++num_active_outputs;
|
2019-02-07 18:51:53 +00:00
|
|
|
|
|
|
|
/// The order is important.
|
|
|
|
if (!output.canPush())
|
|
|
|
all_can_push = false;
|
2018-05-24 02:39:22 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
if (all_finished)
|
|
|
|
{
|
|
|
|
input.close();
|
|
|
|
return Status::Finished;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!all_can_push)
|
2018-05-24 02:39:22 +00:00
|
|
|
{
|
|
|
|
input.setNotNeeded();
|
2019-02-07 18:51:53 +00:00
|
|
|
return Status::PortFull;
|
2018-05-24 02:39:22 +00:00
|
|
|
}
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Check can input.
|
2018-05-24 02:39:22 +00:00
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
if (input.isFinished())
|
2018-05-24 02:39:22 +00:00
|
|
|
{
|
2019-02-07 18:51:53 +00:00
|
|
|
for (auto & output : outputs)
|
|
|
|
output.finish();
|
|
|
|
|
|
|
|
return Status::Finished;
|
2018-05-24 02:39:22 +00:00
|
|
|
}
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
input.setNeeded();
|
|
|
|
|
|
|
|
if (!input.hasData())
|
|
|
|
return Status::NeedData;
|
|
|
|
|
|
|
|
/// Move data.
|
|
|
|
|
2018-05-24 02:39:22 +00:00
|
|
|
auto data = input.pull();
|
2019-06-18 08:25:27 +00:00
|
|
|
size_t num_processed_outputs = 0;
|
2018-05-24 02:39:22 +00:00
|
|
|
|
|
|
|
for (auto & output : outputs)
|
2019-06-18 08:25:27 +00:00
|
|
|
{
|
2019-02-07 18:51:53 +00:00
|
|
|
if (!output.isFinished()) /// Skip finished outputs.
|
2019-06-18 08:25:27 +00:00
|
|
|
{
|
|
|
|
++num_processed_outputs;
|
|
|
|
if (num_processed_outputs == num_active_outputs)
|
2020-03-18 02:02:24 +00:00
|
|
|
output.push(std::move(data)); // NOLINT Can push because no full or unneeded outputs.
|
2019-06-18 08:25:27 +00:00
|
|
|
else
|
|
|
|
output.push(data.clone());
|
|
|
|
}
|
|
|
|
}
|
2018-05-24 02:39:22 +00:00
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Now, we pulled from input. It must be empty.
|
2018-05-24 02:39:22 +00:00
|
|
|
return Status::NeedData;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|