Added ThreadsQueue to PipelineExecutor.

This commit is contained in:
Nikolai Kochetov 2019-09-10 11:45:48 +03:00
parent 5165bbd88a
commit 73bda85998
2 changed files with 61 additions and 19 deletions

View File

@ -498,9 +498,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
if (!task_queue.empty() && !threads_queue.empty())
{
auto thread_to_wake = *threads_queue.begin();
threads_queue.erase(threads_queue.begin());
--num_waiting_threads;
auto thread_to_wake = threads_queue.pop_front();
lock.unlock();
std::lock_guard guard(executor_contexts[thread_to_wake]->mutex);
@ -511,16 +509,14 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
break;
}
++num_waiting_threads;
if (num_waiting_threads == num_threads)
if (threads_queue.size() + 1 == num_threads)
{
lock.unlock();
finish();
break;
}
threads_queue.insert(thread_num);
threads_queue.push(thread_num);
}
{
@ -597,11 +593,9 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
{
std::unique_lock lock(task_queue_mutex);
auto it = threads_queue.find(stream);
if (it != threads_queue.end())
if (threads_queue.has(stream))
{
threads_queue.erase(it);
--num_waiting_threads;
threads_queue.pop(stream);
found_in_queue = true;
}
}
@ -635,10 +629,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
if (!threads_queue.empty())
{
auto it = threads_queue.begin();
auto thread_to_wake = *it;
threads_queue.erase(it);
--num_waiting_threads;
auto thread_to_wake = threads_queue.pop_front();
lock.unlock();
std::lock_guard guard(executor_contexts[thread_to_wake]->mutex);

View File

@ -122,7 +122,61 @@ private:
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.
/// Stores processors need to be prepared. Preparing status is already set for them.
TaskQueue task_queue;
std::set<size_t> threads_queue;
struct ThreadsQueue
{
void init(size_t num_threads)
{
queue_size = 0;
is_thread_in_queue.assign(num_threads, false);
}
bool has(size_t pos) const { return is_thread_in_queue[pos]; }
size_t size() const { return queue_size; }
bool empty() const { return queue_size == 0; }
void push(size_t pos)
{
if (unlikely(has(pos)))
throw Exception("Can't push thread because it is already in threads queue.", ErrorCodes::LOGICAL_ERROR);
++queue_size;
is_thread_in_queue[pos] = true;
}
void pop(size_t pos)
{
if (unlikely(!has(pos)))
throw Exception("Can't pop thread because it is not in threads queue.", ErrorCodes::LOGICAL_ERROR);
--queue_size;
is_thread_in_queue[pos] = false;
}
size_t pop_front()
{
if (unlikely(queue_size == 0))
throw Exception("Can't pop from empty queue.", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < is_thread_in_queue.size(); ++i)
{
if (has(i))
{
is_thread_in_queue[i] = false;
--queue_size;
return i;
}
}
throw Exception("Can't find any thread in queue.", ErrorCodes::LOGICAL_ERROR);
}
private:
std::vector<char> is_thread_in_queue;
size_t queue_size = 0;
};
ThreadsQueue threads_queue;
std::mutex task_queue_mutex;
std::atomic_bool cancelled;
@ -130,9 +184,6 @@ private:
Poco::Logger * log = &Poco::Logger::get("PipelineExecutor");
/// Num threads waiting condvar. Last thread finish execution if task_queue is empty.
size_t num_waiting_threads = 0;
/// Things to stop execution to expand pipeline.
struct ExpandPipelineTask
{