diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index bb9e22e5a1b..92aa831f233 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -1519,9 +1519,11 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si Stopwatch watch; size_t rows = data_variants.size(); - std::unique_lock lk(tmp_files_mutex); - auto & out_stream = tmp_files.emplace_back(getHeader(false), tmp_data.get(), max_temp_file_size); - lk.unlock(); + auto & out_stream = [this, max_temp_file_size]() -> TemporaryBlockStreamHolder & + { + std::lock_guard lk(tmp_files_mutex); + return tmp_files.emplace_back(getHeader(false), tmp_data.get(), max_temp_file_size); + }(); ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart); @@ -1642,9 +1644,10 @@ Block Aggregator::convertOneBucketToBlock(AggregatedDataVariants & variants, Are return block; } -std::list & Aggregator::getTemporaryData() +std::list Aggregator::detachTemporaryData() { - return tmp_files; + std::lock_guard lk(tmp_files_mutex); + return std::move(tmp_files); } bool Aggregator::hasTemporaryData() const diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 451583946eb..eec64c171a6 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -311,7 +311,7 @@ public: bool hasTemporaryData() const; - std::list & getTemporaryData(); + std::list detachTemporaryData(); /// Get data structure of the result. Block getHeader(bool final) const; @@ -357,7 +357,7 @@ private: /// For external aggregation. TemporaryDataOnDiskScopePtr tmp_data; mutable std::mutex tmp_files_mutex; - mutable std::list tmp_files; + mutable std::list tmp_files TSA_GUARDED_BY(tmp_files_mutex); size_t min_bytes_for_prefetch = 0; diff --git a/src/Interpreters/tests/gtest_filecache.cpp b/src/Interpreters/tests/gtest_filecache.cpp index ae45443d4bd..60436604f70 100644 --- a/src/Interpreters/tests/gtest_filecache.cpp +++ b/src/Interpreters/tests/gtest_filecache.cpp @@ -988,7 +988,7 @@ try { TemporaryBlockStreamHolder stream(generateBlock(), tmp_data_scope.get()); ASSERT_TRUE(stream); - /// Do nothitng with stream, just create it and destroy. + /// Do nothing with stream, just create it and destroy. } { diff --git a/src/Processors/Transforms/AggregatingTransform.cpp b/src/Processors/Transforms/AggregatingTransform.cpp index 2c54788b995..21eec6c305a 100644 --- a/src/Processors/Transforms/AggregatingTransform.cpp +++ b/src/Processors/Transforms/AggregatingTransform.cpp @@ -813,10 +813,10 @@ void AggregatingTransform::initGenerate() /// Merge external data from all aggregators used in query. for (auto & aggregator : *params->aggregator_list_ptr) { - auto & tmp_data = aggregator.getTemporaryData(); - num_streams += tmp_data.size(); + tmp_files = aggregator.detachTemporaryData(); + num_streams += tmp_files.size(); - for (auto & tmp_stream : tmp_data) + for (auto & tmp_stream : tmp_files) { auto stat = tmp_stream.finishWriting(); compressed_size += stat.compressed_size; diff --git a/src/Processors/Transforms/AggregatingTransform.h b/src/Processors/Transforms/AggregatingTransform.h index b9212375c91..a7d18664786 100644 --- a/src/Processors/Transforms/AggregatingTransform.h +++ b/src/Processors/Transforms/AggregatingTransform.h @@ -216,6 +216,8 @@ private: RowsBeforeStepCounterPtr rows_before_aggregation; + std::list tmp_files; + void initGenerate(); }; diff --git a/tests/queries/0_stateless/02402_external_disk_mertrics.sql b/tests/queries/0_stateless/02402_external_disk_mertrics.sql index 7237ea19775..1a3bf9a230a 100644 --- a/tests/queries/0_stateless/02402_external_disk_mertrics.sql +++ b/tests/queries/0_stateless/02402_external_disk_mertrics.sql @@ -77,7 +77,8 @@ SELECT 'ok', 'fail: ' || toString(count()) || ' ' || toString(any(ProfileEvents)) ) - FROM system.query_log WHERE current_database = currentDatabase() + FROM system.query_log + WHERE current_database = currentDatabase() AND log_comment = '02402_external_disk_mertrics/join' AND query ILIKE 'SELECT%2097152%' AND type = 'QueryFinish';