Add stream enumeration to Processors.

This commit is contained in:
Nikolai Kochetov 2019-09-02 20:01:03 +03:00
parent cfbb3ce6f9
commit 52ca3f2b4c

View File

@ -126,14 +126,14 @@ private:
if (stream >= queues.size()) if (stream >= queues.size())
stream = queues.size() - 1; stream = queues.size() - 1;
std::lock_guard lg(mutexes[stream]); std::lock_guard lg(*mutexes[stream]);
queues[stream].push(state); queues[stream].push(state);
} }
ExecutionState * pop(size_t stream) ExecutionState * pop(size_t stream)
{ {
{ {
std::lock_guard lg(mutexes[stream]); std::lock_guard lg(*mutexes[stream]);
if (!queues[stream].empty()) if (!queues[stream].empty())
{ {
@ -156,7 +156,7 @@ private:
bool empty(size_t stream) bool empty(size_t stream)
{ {
std::lock_guard lg(mutexes[stream]); std::lock_guard lg(*mutexes[stream]);
return queues[stream].empty(); return queues[stream].empty();
} }
@ -165,11 +165,11 @@ private:
queues.resize(num_streams + 1); queues.resize(num_streams + 1);
for (size_t i = 0; i <= num_streams; ++i) for (size_t i = 0; i <= num_streams; ++i)
mutexes.emplace_back(); mutexes.emplace_back(std::make_unique<std::mutex>());
} }
std::vector<std::queue<ExecutionState *>> queues; std::vector<std::queue<ExecutionState *>> queues;
std::vector<std::mutex> mutexes; std::vector<std::unique_ptr<std::mutex>> mutexes;
}; };
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set. /// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.