diff --git a/dbms/src/Processors/Transforms/AggregatingTransform.cpp b/dbms/src/Processors/Transforms/AggregatingTransform.cpp index 68441e396f0..bff4aac9a17 100644 --- a/dbms/src/Processors/Transforms/AggregatingTransform.cpp +++ b/dbms/src/Processors/Transforms/AggregatingTransform.cpp @@ -433,9 +433,11 @@ IProcessor::Status AggregatingTransform::prepare() } } - input.setNeeded(); if (!input.hasData()) + { + input.setNeeded(); return Status::NeedData; + } current_chunk = input.pull(/*set_not_needed = */ !is_consume_finished); read_current_chunk = true; diff --git a/dbms/src/Processors/Transforms/MergingSortedTransform.cpp b/dbms/src/Processors/Transforms/MergingSortedTransform.cpp index ddbd91b38d1..3c51a7622a6 100644 --- a/dbms/src/Processors/Transforms/MergingSortedTransform.cpp +++ b/dbms/src/Processors/Transforms/MergingSortedTransform.cpp @@ -74,16 +74,8 @@ IProcessor::Status MergingSortedTransform::prepare() return Status::Finished; } - if (!output.isNeeded()) - { - for (auto & in : inputs) - in.setNotNeeded(); - - return Status::PortFull; - } - - if (output.hasData()) - return Status::PortFull; + /// Do not disable inputs, so it will work in the same way as with AsynchronousBlockInputStream, like before. + bool is_port_full = !output.canPush(); /// Special case for single input. if (inputs.size() == 1) @@ -96,14 +88,20 @@ IProcessor::Status MergingSortedTransform::prepare() } input.setNeeded(); + if (input.hasData()) - output.push(input.pull()); + { + if (!is_port_full) + output.push(input.pull()); + + return Status::PortFull; + } return Status::NeedData; } /// Push if has data. - if (merged_data.mergedRows()) + if (merged_data.mergedRows() && !is_port_full) output.push(merged_data.pull()); if (!is_initialized) @@ -119,7 +117,7 @@ IProcessor::Status MergingSortedTransform::prepare() if (!cursors[i].empty()) { - input.setNotNeeded(); + // input.setNotNeeded(); continue; } @@ -192,6 +190,9 @@ IProcessor::Status MergingSortedTransform::prepare() need_data = false; } + if (is_port_full) + return Status::PortFull; + return Status::Ready; } }