dbms: external aggregation: development [#METR-17000].

This commit is contained in:
Alexey Milovidov 2015-12-01 19:58:15 +03:00
parent fe8f947a43
commit 8844334423
6 changed files with 36 additions and 12 deletions

View File

@ -112,13 +112,16 @@ protected:
const auto & files = aggregator.getTemporaryFiles();
BlockInputStreams input_streams;
for (const auto & file : files)
for (const auto & file : files.files)
{
temporary_inputs.emplace_back(new TemporaryFileStream(file->path()));
input_streams.emplace_back(temporary_inputs.back()->block_in);
}
LOG_TRACE(log, "Will merge " << files.size() << " temporary files.");
LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size "
<< (files.sum_size_compressed / 1048576.0) << " MiB compressed, "
<< (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed.");
impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final));
}
}

View File

@ -877,11 +877,18 @@ public:
/// Для IBlockInputStream.
String getID() const;
/// Для внешней агрегации.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t rows);
bool hasTemporaryFiles() const { return !temporary_files.empty(); }
bool hasTemporaryFiles() const { return !temporary_files.files.empty(); }
using TemporaryFiles = std::vector<std::unique_ptr<Poco::TemporaryFile>>;
struct TemporaryFiles
{
std::vector<std::unique_ptr<Poco::TemporaryFile>> files;
size_t sum_size_uncompressed = 0;
size_t sum_size_compressed = 0;
std::mutex mutex;
};
const TemporaryFiles & getTemporaryFiles() const { return temporary_files; }
@ -951,7 +958,6 @@ protected:
/// Для внешней агрегации.
TemporaryFiles temporary_files;
std::mutex temporary_files_mutex;
/** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов.
* Сформировать блок - пример результата.

View File

@ -37,13 +37,16 @@ Block AggregatingBlockInputStream::readImpl()
const auto & files = aggregator.getTemporaryFiles();
BlockInputStreams input_streams;
for (const auto & file : files)
for (const auto & file : files.files)
{
temporary_inputs.emplace_back(new TemporaryFileStream(file->path()));
input_streams.emplace_back(temporary_inputs.back()->block_in);
}
LOG_TRACE(log, "Will merge " << files.size() << " temporary files.");
LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size "
<< (files.sum_size_compressed / 1048576.0) << " MiB compressed, "
<< (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed.");
impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final));
}
}

View File

@ -795,15 +795,17 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
compressed_buf.next();
file_buf.next();
{
std::lock_guard<std::mutex> lock(temporary_files_mutex);
temporary_files.emplace_back(std::move(file));
}
double elapsed_seconds = watch.elapsedSeconds();
double compressed_bytes = file_buf.count();
double uncompressed_bytes = compressed_buf.count();
{
std::lock_guard<std::mutex> lock(temporary_files.mutex);
temporary_files.files.emplace_back(std::move(file));
temporary_files.sum_size_uncompressed += uncompressed_bytes;
temporary_files.sum_size_compressed += compressed_bytes;
}
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Written part in " << elapsed_seconds << " sec., "
<< rows << " rows, "
@ -857,6 +859,9 @@ void Aggregator::writeToTemporaryFileImpl(
max_temporary_block_size_bytes = block_size_bytes;
}
/// data_variants не будет уничтожать состояния агрегатных функций в деструкторе. Теперь состояниями владеют ColumnAggregateFunction.
data_variants.aggregator = nullptr;
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Max size of temporary block: " << max_temporary_block_size_rows << " rows, "
<< (max_temporary_block_size_bytes / 1048576.0) << " MiB.");

View File

@ -0,0 +1,2 @@
49999995000000 10000000
499999500000 1000000 15

View File

@ -0,0 +1,5 @@
SET max_bytes_before_external_group_by = 100000000;
SET max_memory_usage = 200000000;
SELECT sum(k), sum(c) FROM (SELECT number AS k, count() AS c FROM (SELECT * FROM system.numbers LIMIT 10000000) GROUP BY k);
SELECT sum(k), sum(c), max(u) FROM (SELECT number AS k, count() AS c, uniqArray(range(number % 16)) AS u FROM (SELECT * FROM system.numbers LIMIT 1000000) GROUP BY k);