fix handling of overflow data during external aggregation [#CLICKHOUSE-3133]

This commit is contained in:
Alexey Zatelepin 2017-07-25 16:09:52 +03:00 committed by alexey-milovidov
parent cd5dc7ca29
commit 18672204e5
4 changed files with 30 additions and 19 deletions

View File

@ -43,9 +43,8 @@ Block AggregatingBlockInputStream::readImpl()
if (!isCancelled())
{
/// Flush data in the RAM to disk also. It's easier.
size_t rows = data_variants->sizeWithoutOverflowRow();
if (rows)
aggregator.writeToTemporaryFile(*data_variants, rows);
if (data_variants->size())
aggregator.writeToTemporaryFile(*data_variants);
}
const auto & files = aggregator.getTemporaryFiles();

View File

@ -139,9 +139,8 @@ void ParallelAggregatingBlockInputStream::Handler::onFinishThread(size_t thread_
if (data.isConvertibleToTwoLevel())
data.convertToTwoLevel();
size_t rows = data.sizeWithoutOverflowRow();
if (rows)
parent.aggregator.writeToTemporaryFile(data, rows);
if (data.size())
parent.aggregator.writeToTemporaryFile(data);
}
}
@ -156,9 +155,8 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish()
if (data->isConvertibleToTwoLevel())
data->convertToTwoLevel();
size_t rows = data->sizeWithoutOverflowRow();
if (rows)
parent.aggregator.writeToTemporaryFile(*data, rows);
if (data->size())
parent.aggregator.writeToTemporaryFile(*data);
}
}
}

View File

@ -873,16 +873,17 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
&& worth_convert_to_two_level)
{
writeToTemporaryFile(result, result_size);
writeToTemporaryFile(result);
}
return true;
}
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t rows)
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
{
Stopwatch watch;
size_t rows = data_variants.size();
auto file = std::make_unique<Poco::TemporaryFile>(params.tmp_path);
const std::string & path = file->path();
@ -893,7 +894,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
LOG_DEBUG(log, "Writing part of aggregation data into temporary file " << path << ".");
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);
/// Flush only two-level data.
/// Flush only two-level data and possibly overflow data.
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
@ -909,6 +910,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
data_variants.init(data_variants.type);
data_variants.aggregates_pools = Arenas(1, std::make_shared<Arena>());
data_variants.aggregates_pool = data_variants.aggregates_pools.back().get();
data_variants.without_key = nullptr;
block_out.flush();
compressed_buf.next();
@ -976,21 +978,33 @@ void Aggregator::writeToTemporaryFileImpl(
size_t max_temporary_block_size_rows = 0;
size_t max_temporary_block_size_bytes = 0;
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
auto update_max_sizes = [&](const Block & block)
{
Block block = convertOneBucketToBlock(data_variants, method, false, bucket);
out.write(block);
size_t block_size_rows = block.rows();
size_t block_size_bytes = block.bytes();
if (block_size_rows > max_temporary_block_size_rows)
max_temporary_block_size_rows = block.rows();
max_temporary_block_size_rows = block_size_rows;
if (block_size_bytes > max_temporary_block_size_bytes)
max_temporary_block_size_bytes = block_size_bytes;
};
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
Block block = convertOneBucketToBlock(data_variants, method, false, bucket);
out.write(block);
update_max_sizes(block);
}
/// `data_variants` will not destroy the states of aggregate functions in the destructor. Now the states own the ColumnAggregateFunction.
if (params.overflow_row)
{
Block block = prepareBlockAndFillWithoutKey(data_variants, false, true);
out.write(block);
update_max_sizes(block);
}
/// Pass ownership of the aggregate functions states:
/// `data_variants` will not destroy them in the destructor, they are now owned by ColumnAggregateFunction objects.
data_variants.aggregator = nullptr;
LOG_TRACE(log, std::fixed << std::setprecision(3)

View File

@ -1090,7 +1090,7 @@ public:
String getID() const;
/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t rows);
void writeToTemporaryFile(AggregatedDataVariants & data_variants);
bool hasTemporaryFiles() const { return !temporary_files.empty(); }