Remove unused code

This commit is contained in:
Dmitry Novik 2021-12-14 22:44:54 +03:00
parent 56a3f4a000
commit e8a96b53b1
4 changed files with 0 additions and 58 deletions

View File

@ -432,55 +432,6 @@ void Pipe::addTransform(ProcessorPtr transform)
addTransform(std::move(transform), static_cast<OutputPort *>(nullptr), static_cast<OutputPort *>(nullptr)); addTransform(std::move(transform), static_cast<OutputPort *>(nullptr), static_cast<OutputPort *>(nullptr));
} }
void Pipe::addParallelTransforms(Processors transforms)
{
LOG_DEBUG(log, "Begin addParallelTransforms, have {} transforms", transforms.size());
if (output_ports.empty())
throw Exception("Cannot add parallel transforms to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
std::vector<InputPort *> inputs;
std::vector<OutputPort *> outputs;
for (const auto & transform : transforms)
{
auto & current_transform_inputs = transform->getInputs();
if (current_transform_inputs.size() != 1)
throw Exception("Each parallel transform should have one input port", ErrorCodes::LOGICAL_ERROR);
inputs.push_back(&(current_transform_inputs.front()));
auto & current_transform_outputs = transform->getOutputs();
if (current_transform_outputs.size() != 1)
throw Exception("Each parallel transform should have one output port", ErrorCodes::LOGICAL_ERROR);
outputs.push_back(&(current_transform_outputs.front()));
LOG_DEBUG(log, "addParallelTransforms, added inputs and outputs for processor {}", transform->getName());
LOG_DEBUG(log, "output structure: {}", transform->getOutputs().front().getHeader().dumpStructure());
}
if (inputs.size() != output_ports.size())
throw Exception("Cannot add parallel transforms to Pipes because " +
std::to_string(transforms.size()) + " transforms were passed, "
"but " + std::to_string(output_ports.size()) + " expected", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < inputs.size(); ++i)
connect(*output_ports[i], *inputs[i]);
output_ports = std::move(outputs);
/// do not check output formats because they are different in case of parallel aggregations
LOG_DEBUG(log, "addParallelTransforms do not check format");
if (collected_processors)
collected_processors->insert(collected_processors->end(), transforms.begin(), transforms.end());
for (auto & transform_ptr : transforms)
processors.emplace_back(std::move(transform_ptr));
/// Should not change streams number, so maybe not need max_parallel_streams update
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes) void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes)
{ {
if (output_ports.empty()) if (output_ports.empty())

View File

@ -64,7 +64,6 @@ public:
/// Output ports should have same headers. /// Output ports should have same headers.
/// If totals or extremes are not empty, transform shouldn't change header. /// If totals or extremes are not empty, transform shouldn't change header.
void addTransform(ProcessorPtr transform); void addTransform(ProcessorPtr transform);
void addParallelTransforms(Processors transform);
void addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes); void addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes);
void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes); void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes);

View File

@ -155,12 +155,6 @@ void QueryPipelineBuilder::transform(const Transformer & transformer)
pipe.transform(transformer); pipe.transform(transformer);
} }
void QueryPipelineBuilder::addParallelTransforms(Processors transforms)
{
checkInitializedAndNotCompleted();
pipe.addParallelTransforms(transforms);
}
void QueryPipelineBuilder::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter) void QueryPipelineBuilder::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
{ {
checkInitializedAndNotCompleted(); checkInitializedAndNotCompleted();

View File

@ -67,8 +67,6 @@ public:
/// Transform pipeline in general way. /// Transform pipeline in general way.
void transform(const Transformer & transformer); void transform(const Transformer & transformer);
/// Add transforms and connect it to outputs streams
void addParallelTransforms(Processors transform);
/// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port. /// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port.
void addTotalsHavingTransform(ProcessorPtr transform); void addTotalsHavingTransform(ProcessorPtr transform);
/// Add transform which calculates extremes. This transform adds extremes port and doesn't change inputs number. /// Add transform which calculates extremes. This transform adds extremes port and doesn't change inputs number.