diff --git a/src/Formats/TemporaryFileStream.cpp b/src/Formats/TemporaryFileStream.cpp index e8dc832dbe1..f4c66b67a45 100644 --- a/src/Formats/TemporaryFileStream.cpp +++ b/src/Formats/TemporaryFileStream.cpp @@ -39,10 +39,7 @@ TemporaryFileStream::Stat TemporaryFileStream::write(const std::string & path, c output.write(block); compressed_buf.finalize(); - output.flush(); - file_buf.next(); - - return Stat{compressed_buf.count(), file_buf.count()}; + return Stat{compressed_buf.getCompressedBytes(), compressed_buf.getUncompressedBytes()}; } } diff --git a/src/Formats/TemporaryFileStream.h b/src/Formats/TemporaryFileStream.h index f61d928d204..e858da1dc33 100644 --- a/src/Formats/TemporaryFileStream.h +++ b/src/Formats/TemporaryFileStream.h @@ -14,8 +14,8 @@ struct TemporaryFileStream { struct Stat { - size_t uncompressed_bytes = 0; size_t compressed_bytes = 0; + size_t uncompressed_bytes = 0; }; ReadBufferFromFile file_in; diff --git a/src/Processors/Transforms/MergeSortingTransform.cpp b/src/Processors/Transforms/MergeSortingTransform.cpp index 7d632654964..001c33baa4a 100644 --- a/src/Processors/Transforms/MergeSortingTransform.cpp +++ b/src/Processors/Transforms/MergeSortingTransform.cpp @@ -63,7 +63,7 @@ public: compressed_buf_out.next(); file_buf_out.next(); - updateStat(); + auto stat = updateWriteStat(); LOG_INFO(log, "Done writing part of data into temporary file {}, compressed {}, uncompressed {} ", path, ReadableSize(static_cast(stat.compressed_size)), ReadableSize(static_cast(stat.uncompressed_size))); @@ -90,16 +90,22 @@ public: } private: - void updateStat() + struct Stat { - stat.compressed_size = file_buf_out.count(); - stat.uncompressed_size = compressed_buf_out.count(); + size_t compressed_size = 0; + size_t uncompressed_size = 0; + }; - ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, stat.compressed_size); - ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, stat.uncompressed_size); + Stat updateWriteStat() + { + Stat res{compressed_buf_out.getCompressedBytes(), compressed_buf_out.getUncompressedBytes()}; - ProfileEvents::increment(ProfileEvents::ExternalSortCompressedBytes, stat.compressed_size); - ProfileEvents::increment(ProfileEvents::ExternalSortUncompressedBytes, stat.uncompressed_size); + ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, res.compressed_size); + ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, res.uncompressed_size); + + ProfileEvents::increment(ProfileEvents::ExternalSortCompressedBytes, res.compressed_size); + ProfileEvents::increment(ProfileEvents::ExternalSortUncompressedBytes, res.uncompressed_size); + return res; } Poco::Logger * log; @@ -111,12 +117,6 @@ private: std::unique_ptr file_in; std::unique_ptr compressed_in; std::unique_ptr block_in; - - struct - { - size_t compressed_size = 0; - size_t uncompressed_size = 0; - } stat; }; MergeSortingTransform::MergeSortingTransform(