This commit is contained in:
vdimir 2024-11-13 13:10:23 +00:00
parent 60fa4be570
commit 8b340c864a
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
6 changed files with 18 additions and 12 deletions

View File

@ -1519,9 +1519,11 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
Stopwatch watch;
size_t rows = data_variants.size();
std::unique_lock lk(tmp_files_mutex);
auto & out_stream = tmp_files.emplace_back(getHeader(false), tmp_data.get(), max_temp_file_size);
lk.unlock();
auto & out_stream = [this, max_temp_file_size]() -> TemporaryBlockStreamHolder &
{
std::lock_guard lk(tmp_files_mutex);
return tmp_files.emplace_back(getHeader(false), tmp_data.get(), max_temp_file_size);
}();
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);
@ -1642,9 +1644,10 @@ Block Aggregator::convertOneBucketToBlock(AggregatedDataVariants & variants, Are
return block;
}
std::list<TemporaryBlockStreamHolder> & Aggregator::getTemporaryData()
std::list<TemporaryBlockStreamHolder> Aggregator::detachTemporaryData()
{
return tmp_files;
std::lock_guard lk(tmp_files_mutex);
return std::move(tmp_files);
}
bool Aggregator::hasTemporaryData() const

View File

@ -311,7 +311,7 @@ public:
bool hasTemporaryData() const;
std::list<TemporaryBlockStreamHolder> & getTemporaryData();
std::list<TemporaryBlockStreamHolder> detachTemporaryData();
/// Get data structure of the result.
Block getHeader(bool final) const;
@ -357,7 +357,7 @@ private:
/// For external aggregation.
TemporaryDataOnDiskScopePtr tmp_data;
mutable std::mutex tmp_files_mutex;
mutable std::list<TemporaryBlockStreamHolder> tmp_files;
mutable std::list<TemporaryBlockStreamHolder> tmp_files TSA_GUARDED_BY(tmp_files_mutex);
size_t min_bytes_for_prefetch = 0;

View File

@ -988,7 +988,7 @@ try
{
TemporaryBlockStreamHolder stream(generateBlock(), tmp_data_scope.get());
ASSERT_TRUE(stream);
/// Do nothitng with stream, just create it and destroy.
/// Do nothing with stream, just create it and destroy.
}
{

View File

@ -813,10 +813,10 @@ void AggregatingTransform::initGenerate()
/// Merge external data from all aggregators used in query.
for (auto & aggregator : *params->aggregator_list_ptr)
{
auto & tmp_data = aggregator.getTemporaryData();
num_streams += tmp_data.size();
tmp_files = aggregator.detachTemporaryData();
num_streams += tmp_files.size();
for (auto & tmp_stream : tmp_data)
for (auto & tmp_stream : tmp_files)
{
auto stat = tmp_stream.finishWriting();
compressed_size += stat.compressed_size;

View File

@ -216,6 +216,8 @@ private:
RowsBeforeStepCounterPtr rows_before_aggregation;
std::list<TemporaryBlockStreamHolder> tmp_files;
void initGenerate();
};

View File

@ -77,7 +77,8 @@ SELECT
'ok',
'fail: ' || toString(count()) || ' ' || toString(any(ProfileEvents))
)
FROM system.query_log WHERE current_database = currentDatabase()
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = '02402_external_disk_mertrics/join'
AND query ILIKE 'SELECT%2097152%' AND type = 'QueryFinish';