dbms: better [#METR-17000].

This commit is contained in:
Alexey Milovidov 2015-12-02 00:20:14 +03:00
parent b04fc9bdf0
commit d76d57dda2
5 changed files with 43 additions and 11 deletions

View File

@ -102,14 +102,6 @@ protected:
* то читаем и мерджим их, расходуя минимальное количество памяти.
*/
/// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще. NOTE Это можно делать параллельно.
for (AggregatedDataVariantsPtr & data : many_data)
{
size_t rows = data->sizeWithoutOverflowRow();
if (rows)
aggregator.writeToTemporaryFile(*data, rows);
}
const auto & files = aggregator.getTemporaryFiles();
BlockInputStreams input_streams;
for (const auto & file : files.files)
@ -211,8 +203,31 @@ private:
parent.threads_data[thread_num].src_bytes += block.bytes();
}
void onFinishThread(size_t thread_num)
{
if (parent.aggregator.hasTemporaryFiles())
{
/// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще их потом объединять.
auto & data = *parent.many_data[thread_num];
size_t rows = data.sizeWithoutOverflowRow();
if (rows)
parent.aggregator.writeToTemporaryFile(data, rows);
}
}
void onFinish()
{
if (parent.aggregator.hasTemporaryFiles())
{
/// Может так получиться, что какие-то данные ещё не сброшены на диск,
/// потому что во время вызова onFinishThread ещё никакие данные не были сброшены на диск, а потом какие-то - были.
for (auto & data : parent.many_data)
{
size_t rows = data->sizeWithoutOverflowRow();
if (rows)
parent.aggregator.writeToTemporaryFile(*data, rows);
}
}
}
void onException(std::exception_ptr & exception, size_t thread_num)

View File

@ -43,6 +43,11 @@ struct ParallelInputsHandler
/// Обработка блока данных + дополнительных информаций.
void onBlock(Block & block, BlockExtraInfo & extra_info, size_t thread_num) {}
/// Вызывается для каждого потока, когда потоку стало больше нечего делать.
/// Из-за того, что иссякла часть источников, и сейчас источников осталось меньше, чем потоков.
/// Вызывается, если метод onException не кидает исключение; вызывается до метода onFinish.
void onFinishThread(size_t thread_num) {}
/// Блоки закончились. Из-за того, что все источники иссякли или из-за отмены работы.
/// Этот метод всегда вызывается ровно один раз, в конце работы, если метод onException не кидает исключение.
void onFinish() {}
@ -182,6 +187,8 @@ private:
handler.onException(exception, thread_num);
}
handler.onFinishThread(thread_num);
/// Последний поток при выходе сообщает, что данных больше нет.
if (0 == --active_threads)
{

View File

@ -271,6 +271,10 @@ private:
parent.output_queue.push(Payload());
}
void onFinishThread(size_t thread_num)
{
}
void onException(std::exception_ptr & exception, size_t thread_num)
{
//std::cerr << "pushing exception\n";

View File

@ -880,14 +880,20 @@ public:
/// Для внешней агрегации.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t rows);
bool hasTemporaryFiles() const { return !temporary_files.files.empty(); }
bool hasTemporaryFiles() const { return !temporary_files.empty(); }
struct TemporaryFiles
{
std::vector<std::unique_ptr<Poco::TemporaryFile>> files;
size_t sum_size_uncompressed = 0;
size_t sum_size_compressed = 0;
std::mutex mutex;
mutable std::mutex mutex;
bool empty() const
{
std::lock_guard<std::mutex> lock(mutex);
return files.empty();
}
};
const TemporaryFiles & getTemporaryFiles() const { return temporary_files; }

View File

@ -2087,7 +2087,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Merging partially aggregated blocks.");
LOG_TRACE(log, "Merging partially aggregated blocks (bucket = " << blocks.front().info.bucket_num << ").");
for (Block & block : blocks)
{