This commit is contained in:
vdimir 2024-10-17 15:33:33 +00:00
parent f238530cc5
commit 017d9557c5
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
2 changed files with 15 additions and 4 deletions

View File

@ -1519,7 +1519,10 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
Stopwatch watch;
size_t rows = data_variants.size();
std::unique_lock lk(tmp_files_mutex);
auto & out_stream = tmp_files.emplace_back(getHeader(false), tmp_data.get(), max_temp_file_size);
lk.unlock();
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}", out_stream.getHolder()->describeFilePath());
@ -1639,11 +1642,18 @@ Block Aggregator::convertOneBucketToBlock(AggregatedDataVariants & variants, Are
return block;
}
std::vector<TemporaryBlockStreamHolder> & Aggregator::getTemporaryData()
std::list<TemporaryBlockStreamHolder> & Aggregator::getTemporaryData()
{
return tmp_files;
}
bool Aggregator::hasTemporaryData() const
{
std::lock_guard lk(tmp_files_mutex);
return !tmp_files.empty();
}
template <typename Method>
void Aggregator::writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,

View File

@ -309,9 +309,9 @@ public:
/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t max_temp_file_size = 0) const;
bool hasTemporaryData() const { return !tmp_files.empty(); }
bool hasTemporaryData() const;
std::vector<TemporaryBlockStreamHolder> & getTemporaryData();
std::list<TemporaryBlockStreamHolder> & getTemporaryData();
/// Get data structure of the result.
Block getHeader(bool final) const;
@ -356,7 +356,8 @@ private:
/// For external aggregation.
TemporaryDataOnDiskScopePtr tmp_data;
mutable std::vector<TemporaryBlockStreamHolder> tmp_files;
mutable std::mutex tmp_files_mutex;
mutable std::list<TemporaryBlockStreamHolder> tmp_files;
size_t min_bytes_for_prefetch = 0;