From 99476d00390b344f1d619364befd4e5c7af879a1 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 3 Sep 2019 14:15:37 +0300 Subject: [PATCH] Add task stilling to PipelineExecutor. --- .../Processors/Executors/PipelineExecutor.cpp | 82 ++++++++++++------- .../Processors/Executors/PipelineExecutor.h | 30 ++----- 2 files changed, 57 insertions(+), 55 deletions(-) diff --git a/dbms/src/Processors/Executors/PipelineExecutor.cpp b/dbms/src/Processors/Executors/PipelineExecutor.cpp index 0cead53feda..5dd8c8c8029 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.cpp +++ b/dbms/src/Processors/Executors/PipelineExecutor.cpp @@ -437,6 +437,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads Stopwatch total_time_watch; ExecutionState * state = nullptr; + auto & context = executor_contexts[thread_num]; auto prepare_processor = [&](UInt64 pid, Stack & children, Stack & parents) { @@ -474,33 +475,25 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads /// Just travers graph and prepare any processor. while (!finished) { - auto pushed = task_queue.num_pushed.load(); - bool found = false; - - while (pushed > task_queue.num_popped.load()) + if (!context->task_queue.empty()) { - /// Fast branch. - if (task_queue.pop(state)) + state = context->task_queue.front(); + context->task_queue.pop(); + break; + } + + { + auto expected_state = context->state_to_steal.load(); + if (expected_state + && context->state_to_steal.compare_exchange_strong(expected_state, nullptr)) { - found = true; + state = expected_state; break; } } - if (found) - break; - std::unique_lock lock(task_queue_mutex); - auto popped = task_queue.num_popped.load(); - -// if (!task_queue.empty()) -// { -// state = task_queue.front(); -// task_queue.pop(); -// break; -// } - ++num_waiting_threads; if (num_waiting_threads == num_threads) @@ -511,10 +504,30 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads break; } - task_queue_condvar.wait(lock, [&]() + while (!finished) { - return finished || popped < task_queue.num_pushed.load(); - }); + bool found = false; + + for (auto & exec_context : executor_contexts) + { + auto expected_state = exec_context->state_to_steal.load(); + if (expected_state + && exec_context->state_to_steal.compare_exchange_strong(expected_state, nullptr)) + { + state = expected_state; + found = true; + break; + } + } + + if (found) + break; + + task_queue_condvar.wait_for(lock, std::chrono::milliseconds(1), [&]() + { + return finished.load(); + }); + } --num_waiting_threads; } @@ -547,7 +560,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads { Stack children; Stack parents; - Queue queue; + auto & queue = context->task_queue; ++num_processing_executors; while (auto task = expand_pipeline_task.load()) @@ -570,17 +583,26 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads if (!queue.empty()) { - /// std::lock_guard lock(task_queue_mutex); + auto task_to_still = queue.front(); + ExecutionState * expected = nullptr; - while (!queue.empty() && !finished) - { - task_queue.push(queue.front()); + if (context->state_to_steal.compare_exchange_strong(expected, task_to_still)) queue.pop(); - } - - task_queue_condvar.notify_all(); } +// if (!queue.empty()) +// { +// /// std::lock_guard lock(task_queue_mutex); +// +// while (!queue.empty() && !finished) +// { +// local_queue.push(queue.front()); +// queue.pop(); +// } +// +// /// task_queue_condvar.notify_all(); +// } + --num_processing_executors; while (auto task = expand_pipeline_task.load()) doExpandPipeline(task, false); diff --git a/dbms/src/Processors/Executors/PipelineExecutor.h b/dbms/src/Processors/Executors/PipelineExecutor.h index 18156c8b7c8..3695ca90416 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.h +++ b/dbms/src/Processors/Executors/PipelineExecutor.h @@ -118,33 +118,10 @@ private: using Stack = std::stack; - struct TaskQueue - { - bool pop(ExecutionState *& state) - { - if (stack.pop(state)) - { - num_popped.fetch_add(1); - return true; - } - - return false; - } - - void push(ExecutionState * state) - { - stack.push(state); - num_pushed.fetch_add(1); - } - - lfs::LFStack stack; - std::atomic num_pushed {0}; - std::atomic num_popped {0}; - }; - + using TaskQueue = std::queue; /// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set. /// Stores processors need to be prepared. Preparing status is already set for them. - TaskQueue task_queue; + /// TaskQueue task_queue; std::mutex task_queue_mutex; std::condition_variable task_queue_condvar; @@ -178,6 +155,9 @@ private: /// Will store context for all expand pipeline tasks (it's easy and we don't expect many). /// This can be solved by using atomic shard ptr. std::list task_list; + + std::atomic state_to_steal { nullptr }; + std::queue task_queue; }; std::vector> executor_contexts;