From 73c052f5765c31c0077a1a41199ff77b8462b6a0 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 3 Sep 2019 21:05:44 +0300 Subject: [PATCH] Revert PipelineExecutor changes. --- .../Processors/Executors/PipelineExecutor.cpp | 79 ++++--------------- .../Processors/Executors/PipelineExecutor.h | 7 +- 2 files changed, 18 insertions(+), 68 deletions(-) diff --git a/dbms/src/Processors/Executors/PipelineExecutor.cpp b/dbms/src/Processors/Executors/PipelineExecutor.cpp index 364bfea89ba..cc61b4a152b 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.cpp +++ b/dbms/src/Processors/Executors/PipelineExecutor.cpp @@ -437,7 +437,6 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads Stopwatch total_time_watch; ExecutionState * state = nullptr; - auto & context = executor_contexts[thread_num]; auto prepare_processor = [&](UInt64 pid, Stack & children, Stack & parents) { @@ -475,47 +474,15 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads /// Just travers graph and prepare any processor. while (!finished) { - /// Fast way. Get task from local queue. - if (!context->task_queue.empty()) + std::unique_lock lock(task_queue_mutex); + + if (!task_queue.empty()) { - state = context->task_queue.front(); - context->task_queue.pop(); + state = task_queue.front(); + task_queue.pop(); break; } - /// Steal from current thread if can. - { - auto expected_state = context->state_to_steal.load(); - if (expected_state - && context->state_to_steal.compare_exchange_strong(expected_state, nullptr)) - { - state = expected_state; - break; - } - } - - /// Steal from any other thread. - { - bool found = false; - - for (auto & exec_context : executor_contexts) - { - auto expected_state = exec_context->state_to_steal.load(); - if (expected_state - && exec_context->state_to_steal.compare_exchange_strong(expected_state, nullptr)) - { - state = expected_state; - found = true; - break; - } - } - - if (found) - break; - } - - std::unique_lock lock(task_queue_mutex); - ++num_waiting_threads; if (num_waiting_threads == num_threads) @@ -526,9 +493,9 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads break; } - task_queue_condvar.wait_for(lock, std::chrono::milliseconds(1), [&]() + task_queue_condvar.wait(lock, [&]() { - return finished.load(); + return finished || !task_queue.empty(); }); --num_waiting_threads; @@ -562,7 +529,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads { Stack children; Stack parents; - auto & queue = context->task_queue; + Queue queue; ++num_processing_executors; while (auto task = expand_pipeline_task.load()) @@ -585,25 +552,16 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads if (!queue.empty()) { - auto task_to_still = queue.front(); - ExecutionState * expected = nullptr; + std::lock_guard lock(task_queue_mutex); - if (context->state_to_steal.compare_exchange_strong(expected, task_to_still)) + while (!queue.empty() && !finished) + { + task_queue.push(queue.front()); queue.pop(); - } + } -// if (!queue.empty()) -// { -// /// std::lock_guard lock(task_queue_mutex); -// -// while (!queue.empty() && !finished) -// { -// local_queue.push(queue.front()); -// queue.pop(); -// } -// -// /// task_queue_condvar.notify_all(); -// } + task_queue_condvar.notify_all(); + } --num_processing_executors; while (auto task = expand_pipeline_task.load()) @@ -654,7 +612,6 @@ void PipelineExecutor::executeImpl(size_t num_threads) { std::lock_guard lock(task_queue_mutex); - size_t cur_thread = 0; while (!stack.empty()) { @@ -664,11 +621,7 @@ void PipelineExecutor::executeImpl(size_t num_threads) if (prepareProcessor(proc, stack, stack, 0, false)) { auto cur_state = graph[proc].execution_state.get(); - executor_contexts[cur_thread]->task_queue.push(cur_state); - - ++cur_thread; - if (cur_thread == num_threads) - cur_thread = 0; + task_queue.push(cur_state); } } } diff --git a/dbms/src/Processors/Executors/PipelineExecutor.h b/dbms/src/Processors/Executors/PipelineExecutor.h index 3695ca90416..9c722cc69c3 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.h +++ b/dbms/src/Processors/Executors/PipelineExecutor.h @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -119,9 +118,10 @@ private: using Stack = std::stack; using TaskQueue = std::queue; + /// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set. /// Stores processors need to be prepared. Preparing status is already set for them. - /// TaskQueue task_queue; + TaskQueue task_queue; std::mutex task_queue_mutex; std::condition_variable task_queue_condvar; @@ -155,9 +155,6 @@ private: /// Will store context for all expand pipeline tasks (it's easy and we don't expect many). /// This can be solved by using atomic shard ptr. std::list task_list; - - std::atomic state_to_steal { nullptr }; - std::queue task_queue; }; std::vector> executor_contexts;