Updates in AggregatingTransform

This commit is contained in:
Nikolai Kochetov 2020-01-16 15:50:04 +03:00
parent 5aa503beb3
commit f6b9d320a9
2 changed files with 4 additions and 1 deletions

View File

@ -628,7 +628,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
queue.pop();
}
if (!threads_queue.empty() /* && task_queue.quota() > threads_queue.size()*/)
if (!threads_queue.empty() && !finished /* && task_queue.quota() > threads_queue.size()*/)
{
auto thread_to_wake = task_queue.getAnyThreadWithTasks(thread_num + 1 == num_threads ? 0 : (thread_num + 1));

View File

@ -439,6 +439,9 @@ IProcessor::Status AggregatingTransform::prepare()
return Status::NeedData;
}
if (is_consume_finished)
input.setNeeded();
current_chunk = input.pull(/*set_not_needed = */ !is_consume_finished);
read_current_chunk = true;