Use getCompressedBytes in BufferingToFileTransform and TemporaryFileStream

This commit is contained in:
vdimir 2022-08-18 10:03:25 +00:00
parent 1e24a731f7
commit 0349c85017
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
3 changed files with 16 additions and 19 deletions

View File

@ -39,10 +39,7 @@ TemporaryFileStream::Stat TemporaryFileStream::write(const std::string & path, c
output.write(block); output.write(block);
compressed_buf.finalize(); compressed_buf.finalize();
output.flush(); return Stat{compressed_buf.getCompressedBytes(), compressed_buf.getUncompressedBytes()};
file_buf.next();
return Stat{compressed_buf.count(), file_buf.count()};
} }
} }

View File

@ -14,8 +14,8 @@ struct TemporaryFileStream
{ {
struct Stat struct Stat
{ {
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0; size_t compressed_bytes = 0;
size_t uncompressed_bytes = 0;
}; };
ReadBufferFromFile file_in; ReadBufferFromFile file_in;

View File

@ -63,7 +63,7 @@ public:
compressed_buf_out.next(); compressed_buf_out.next();
file_buf_out.next(); file_buf_out.next();
updateStat(); auto stat = updateWriteStat();
LOG_INFO(log, "Done writing part of data into temporary file {}, compressed {}, uncompressed {} ", LOG_INFO(log, "Done writing part of data into temporary file {}, compressed {}, uncompressed {} ",
path, ReadableSize(static_cast<double>(stat.compressed_size)), ReadableSize(static_cast<double>(stat.uncompressed_size))); path, ReadableSize(static_cast<double>(stat.compressed_size)), ReadableSize(static_cast<double>(stat.uncompressed_size)));
@ -90,16 +90,22 @@ public:
} }
private: private:
void updateStat() struct Stat
{ {
stat.compressed_size = file_buf_out.count(); size_t compressed_size = 0;
stat.uncompressed_size = compressed_buf_out.count(); size_t uncompressed_size = 0;
};
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, stat.compressed_size); Stat updateWriteStat()
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, stat.uncompressed_size); {
Stat res{compressed_buf_out.getCompressedBytes(), compressed_buf_out.getUncompressedBytes()};
ProfileEvents::increment(ProfileEvents::ExternalSortCompressedBytes, stat.compressed_size); ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, res.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalSortUncompressedBytes, stat.uncompressed_size); ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, res.uncompressed_size);
ProfileEvents::increment(ProfileEvents::ExternalSortCompressedBytes, res.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalSortUncompressedBytes, res.uncompressed_size);
return res;
} }
Poco::Logger * log; Poco::Logger * log;
@ -111,12 +117,6 @@ private:
std::unique_ptr<ReadBufferFromFile> file_in; std::unique_ptr<ReadBufferFromFile> file_in;
std::unique_ptr<CompressedReadBuffer> compressed_in; std::unique_ptr<CompressedReadBuffer> compressed_in;
std::unique_ptr<NativeReader> block_in; std::unique_ptr<NativeReader> block_in;
struct
{
size_t compressed_size = 0;
size_t uncompressed_size = 0;
} stat;
}; };
MergeSortingTransform::MergeSortingTransform( MergeSortingTransform::MergeSortingTransform(