dbms: more uniform aggregation: development [#METR-2944].

This commit is contained in:
Alexey Milovidov 2014-12-14 11:27:22 +03:00
parent 562e4f3a3e
commit 99ba283688
2 changed files with 60 additions and 23 deletions

View File

@ -33,6 +33,7 @@ struct ParallelInputsHandler
void onBlock(Block & block, size_t thread_num) {}
/// Блоки закончились. Из-за того, что все источники иссякли или из-за отмены работы.
/// Этот метод всегда вызывается ровно один раз, в конце работы, если метод onException не кидает исключение.
void onFinish() {}
/// Обработка исключения. Разумно вызывать в этом методе метод ParallelInputsProcessor::cancel, а также передавать эксепшен в основной поток.
@ -145,6 +146,12 @@ private:
{
handler.onException(exception, thread_num);
}
/// Последний поток при выходе сообщает, что данных больше нет.
if (0 == --active_threads)
{
handler.onFinish();
}
}
void loop(size_t thread_num)
@ -196,12 +203,6 @@ private:
handler.onBlock(block, thread_num);
}
}
/// Если не было исключений, последний поток при выходе сообщает, что данных больше нет.
if (0 == --active_threads)
{
handler.onFinish();
}
}
BlockInputStreams inputs;

View File

@ -67,7 +67,7 @@ public:
}
catch (...)
{
LOG_ERROR(log, "Exception while destroying UnionBlockInputStream.");
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -79,7 +79,7 @@ public:
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
return;
// std::cerr << "cancelling\n";
//std::cerr << "cancelling\n";
processor.cancel();
}
@ -92,12 +92,48 @@ protected:
LOG_TRACE(log, "Waiting for threads to finish");
output_queue.clear();
ExceptionPtr exception;
if (!all_read)
{
/** Прочитаем всё до конца, чтобы ParallelInputsProcessor не заблокировался при попытке вставить в очередь.
* Может быть, в очереди есть ещё эксепшен.
*/
OutputData res;
while (true)
{
//std::cerr << "popping\n";
output_queue.pop(res);
if (res.exception)
{
if (!exception)
exception = res.exception;
else if (DB::Exception * e = dynamic_cast<DB::Exception *>(&*exception))
e->addMessage("\n" + res.exception->displayText());
}
else if (!res.block)
break;
}
all_read = true;
}
processor.wait();
LOG_TRACE(log, "Waited for threads to finish");
if (exception)
exception->rethrow();
}
/** Возможны следующие варианты:
* 1. Функция readImpl вызывается до тех пор, пока она не вернёт пустой блок.
* Затем вызывается функция readSuffix и затем деструктор.
* 2. Вызывается функция readImpl. В какой-то момент, возможно из другого потока вызывается функция cancel.
* Затем вызывается функция readSuffix и затем деструктор.
* 3. В любой момент, объект может быть и так уничтожен (вызываться деструктор).
*/
Block readImpl() override
{
OutputData res;
@ -112,7 +148,7 @@ protected:
}
/// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение.
// std::cerr << "popping\n";
//std::cerr << "popping\n";
output_queue.pop(res);
if (res.exception)
@ -124,18 +160,13 @@ protected:
return res.block;
}
/// Вызывается либо после того, как всё прочитано, либо после cancel-а.
void readSuffix() override
{
// std::cerr << "readSuffix\n";
//std::cerr << "readSuffix\n";
if (!all_read && !is_cancelled)
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
/// Может быть, в очереди есть ещё эксепшен.
OutputData res;
while (output_queue.tryPop(res))
if (res.exception)
res.exception->rethrow();
finalize();
for (size_t i = 0; i < children.size(); ++i)
@ -154,7 +185,12 @@ private:
OutputData(ExceptionPtr & exception_) : exception(exception_) {}
};
/// Очередь готовых блоков. Также туда можно положить эксепшен вместо блока.
/** Очередь готовых блоков. Также туда можно положить эксепшен вместо блока.
* Когда данные закончатся - в очередь вставляется пустой блок.
* В очередь всегда (даже после исключения или отмены запроса) рано или поздно вставляется пустой блок.
* Очередь всегда (даже после исключения или отмены запроса, даже в деструкторе) нужно дочитывать до пустого блока,
* иначе ParallelInputsProcessor может заблокироваться при вставке в очередь.
*/
typedef ConcurrentBoundedQueue<OutputData> OutputQueue;
OutputQueue output_queue;
@ -165,26 +201,26 @@ private:
void onBlock(Block & block, size_t thread_num)
{
// std::cerr << "pushing block\n";
//std::cerr << "pushing block\n";
parent.output_queue.push(block);
}
void onFinish()
{
// std::cerr << "pushing end\n";
//std::cerr << "pushing end\n";
parent.output_queue.push(OutputData());
}
void onException(ExceptionPtr & exception, size_t thread_num)
{
// std::cerr << "pushing exception\n";
//std::cerr << "pushing exception\n";
/// Порядок строк имеет значение. Если его поменять, то возможна ситуация,
/// когда перед эксепшеном, в очередь окажется вставлен пустой блок (конец данных),
/// затем быстро отработает функция readSuffix и эксепшен потеряется.
/// и эксепшен потеряется.
parent.output_queue.push(exception);
parent.cancel();
parent.cancel(); /// Не кидает исключений.
}
UnionBlockInputStream & parent;