dbms: fixed race condition (still messy) [#METR-11765].

This commit is contained in:
Alexey Milovidov 2014-07-16 05:36:18 +04:00
parent 71935f7916
commit cdbff6e120

View File

@ -2,6 +2,7 @@
#include <list> #include <list>
#include <queue> #include <queue>
#include <atomic>
#include <Poco/Thread.h> #include <Poco/Thread.h>
@ -37,8 +38,7 @@ class UnionBlockInputStream : public IProfilingBlockInputStream
public: public:
UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1) UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1)
: max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))), : max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))),
output_queue(max_threads), exhausted_inputs(0), finish(false), output_queue(max_threads)
pushed_end_of_output_queue(false), all_read(false), log(&Logger::get("UnionBlockInputStream"))
{ {
children.insert(children.end(), inputs_.begin(), inputs_.end()); children.insert(children.end(), inputs_.begin(), inputs_.end());
@ -211,7 +211,7 @@ private:
{ {
current_memory_tracker = memory_tracker; current_memory_tracker = memory_tracker;
ExceptionPtr exception; ExceptionPtr exception;
try try
{ {
loop(); loop();
@ -223,6 +223,9 @@ private:
if (exception) if (exception)
{ {
/// Отдаём эксепшен в основной поток.
parent.output_queue.push(exception);
try try
{ {
parent.cancel(); parent.cancel();
@ -234,9 +237,6 @@ private:
* - то пофиг. * - то пофиг.
*/ */
} }
/// Отдаём эксепшен в основной поток.
parent.output_queue.push(exception);
} }
} }
@ -292,14 +292,9 @@ private:
if (parent.finish) if (parent.finish)
{ {
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex); /// Отдаём в основной поток пустой блок, что означает, что данных больше нет; только один раз.
if (false == parent.pushed_end_of_output_queue.exchange(true))
/// Отдаём в основной поток пустой блок, что означает, что данных больше нет.
if (!parent.pushed_end_of_output_queue)
{
parent.pushed_end_of_output_queue = true;
parent.output_queue.push(OutputData()); parent.output_queue.push(OutputData());
}
} }
} }
@ -334,19 +329,20 @@ private:
typedef ConcurrentBoundedQueue<OutputData> OutputQueue; typedef ConcurrentBoundedQueue<OutputData> OutputQueue;
OutputQueue output_queue; OutputQueue output_queue;
/// Для операций с очередями. /// Для операций с input_queue.
Poco::FastMutex mutex; Poco::FastMutex mutex;
/// Сколько источников иссякло. /// Сколько источников иссякло.
size_t exhausted_inputs; size_t exhausted_inputs = 0;
/// Завершить работу потоков (раньше, чем иссякнут источники). /// Завершить работу потоков (раньше, чем иссякнут источники).
volatile bool finish; std::atomic<bool> finish { false };
/// Положили ли в output_queue пустой блок. /// Положили ли в output_queue пустой блок.
volatile bool pushed_end_of_output_queue; std::atomic<bool> pushed_end_of_output_queue { false };
bool all_read;
Logger * log; bool all_read { false };
Logger * log = &Logger::get("UnionBlockInputStream");
}; };
} }