From 88443344238c145afc8271eb369dacd40f7dcfea Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 1 Dec 2015 19:58:15 +0300 Subject: [PATCH] dbms: external aggregation: development [#METR-17000]. --- .../ParallelAggregatingBlockInputStream.h | 7 +++++-- dbms/include/DB/Interpreters/Aggregator.h | 12 +++++++++--- .../DataStreams/AggregatingBlockInputStream.cpp | 7 +++++-- dbms/src/Interpreters/Aggregator.cpp | 15 ++++++++++----- .../00284_external_aggregation.reference | 2 ++ .../0_stateless/00284_external_aggregation.sql | 5 +++++ 6 files changed, 36 insertions(+), 12 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/00284_external_aggregation.reference create mode 100644 dbms/tests/queries/0_stateless/00284_external_aggregation.sql diff --git a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h index 499580dcb52..d2da7a2b6f3 100644 --- a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h @@ -112,13 +112,16 @@ protected: const auto & files = aggregator.getTemporaryFiles(); BlockInputStreams input_streams; - for (const auto & file : files) + for (const auto & file : files.files) { temporary_inputs.emplace_back(new TemporaryFileStream(file->path())); input_streams.emplace_back(temporary_inputs.back()->block_in); } - LOG_TRACE(log, "Will merge " << files.size() << " temporary files."); + LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size " + << (files.sum_size_compressed / 1048576.0) << " MiB compressed, " + << (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed."); + impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final)); } } diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index ae1fea6fe59..da6778be79c 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -877,11 +877,18 @@ public: /// Для IBlockInputStream. String getID() const; + /// Для внешней агрегации. void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t rows); - bool hasTemporaryFiles() const { return !temporary_files.empty(); } + bool hasTemporaryFiles() const { return !temporary_files.files.empty(); } - using TemporaryFiles = std::vector>; + struct TemporaryFiles + { + std::vector> files; + size_t sum_size_uncompressed = 0; + size_t sum_size_compressed = 0; + std::mutex mutex; + }; const TemporaryFiles & getTemporaryFiles() const { return temporary_files; } @@ -951,7 +958,6 @@ protected: /// Для внешней агрегации. TemporaryFiles temporary_files; - std::mutex temporary_files_mutex; /** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов. * Сформировать блок - пример результата. diff --git a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp index 175f7dc6e32..d7ff0fe4146 100644 --- a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp @@ -37,13 +37,16 @@ Block AggregatingBlockInputStream::readImpl() const auto & files = aggregator.getTemporaryFiles(); BlockInputStreams input_streams; - for (const auto & file : files) + for (const auto & file : files.files) { temporary_inputs.emplace_back(new TemporaryFileStream(file->path())); input_streams.emplace_back(temporary_inputs.back()->block_in); } - LOG_TRACE(log, "Will merge " << files.size() << " temporary files."); + LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size " + << (files.sum_size_compressed / 1048576.0) << " MiB compressed, " + << (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed."); + impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final)); } } diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index a479ed30c84..3fb4ea3e842 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -795,15 +795,17 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si compressed_buf.next(); file_buf.next(); - { - std::lock_guard lock(temporary_files_mutex); - temporary_files.emplace_back(std::move(file)); - } - double elapsed_seconds = watch.elapsedSeconds(); double compressed_bytes = file_buf.count(); double uncompressed_bytes = compressed_buf.count(); + { + std::lock_guard lock(temporary_files.mutex); + temporary_files.files.emplace_back(std::move(file)); + temporary_files.sum_size_uncompressed += uncompressed_bytes; + temporary_files.sum_size_compressed += compressed_bytes; + } + LOG_TRACE(log, std::fixed << std::setprecision(3) << "Written part in " << elapsed_seconds << " sec., " << rows << " rows, " @@ -857,6 +859,9 @@ void Aggregator::writeToTemporaryFileImpl( max_temporary_block_size_bytes = block_size_bytes; } + /// data_variants не будет уничтожать состояния агрегатных функций в деструкторе. Теперь состояниями владеют ColumnAggregateFunction. + data_variants.aggregator = nullptr; + LOG_TRACE(log, std::fixed << std::setprecision(3) << "Max size of temporary block: " << max_temporary_block_size_rows << " rows, " << (max_temporary_block_size_bytes / 1048576.0) << " MiB."); diff --git a/dbms/tests/queries/0_stateless/00284_external_aggregation.reference b/dbms/tests/queries/0_stateless/00284_external_aggregation.reference new file mode 100644 index 00000000000..48e30e781e0 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00284_external_aggregation.reference @@ -0,0 +1,2 @@ +49999995000000 10000000 +499999500000 1000000 15 diff --git a/dbms/tests/queries/0_stateless/00284_external_aggregation.sql b/dbms/tests/queries/0_stateless/00284_external_aggregation.sql new file mode 100644 index 00000000000..0595b81a0f7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00284_external_aggregation.sql @@ -0,0 +1,5 @@ +SET max_bytes_before_external_group_by = 100000000; +SET max_memory_usage = 200000000; + +SELECT sum(k), sum(c) FROM (SELECT number AS k, count() AS c FROM (SELECT * FROM system.numbers LIMIT 10000000) GROUP BY k); +SELECT sum(k), sum(c), max(u) FROM (SELECT number AS k, count() AS c, uniqArray(range(number % 16)) AS u FROM (SELECT * FROM system.numbers LIMIT 1000000) GROUP BY k);