diff --git a/src/Compression/CompressedReadBufferBase.cpp b/src/Compression/CompressedReadBufferBase.cpp index 30e5d0c6655..244450414ba 100644 --- a/src/Compression/CompressedReadBufferBase.cpp +++ b/src/Compression/CompressedReadBufferBase.cpp @@ -151,8 +151,6 @@ static void readHeaderAndGetCodecAndSize( if (size_compressed_without_checksum < header_size) throw Exception("Can't decompress data: the compressed data size (" + toString(size_compressed_without_checksum) + ", this should include header size) is less than the header size (" + toString(header_size) + ")", ErrorCodes::CORRUPTED_DATA); - - ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum)); } /// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need. @@ -199,6 +197,7 @@ size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed, validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum); } + ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum)); return size_compressed_without_checksum + sizeof(Checksum); } @@ -231,11 +230,14 @@ size_t CompressedReadBufferBase::readCompressedDataBlockForAsynchronous(size_t & compressed_in->position() -= header_size; compressed_buffer = compressed_in->position(); compressed_in->position() += size_compressed_without_checksum; + if (!disable_checksum) { Checksum & checksum = *reinterpret_cast(own_compressed_buffer.data()); validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum); } + + ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum)); return size_compressed_without_checksum + sizeof(Checksum); } else @@ -273,14 +275,12 @@ static void readHeaderAndGetCodec(const char * compressed_buffer, size_t size_de } } - void CompressedReadBufferBase::decompressTo(char * to, size_t size_decompressed, size_t size_compressed_without_checksum) { readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs); codec->decompress(compressed_buffer, size_compressed_without_checksum, to); } - void CompressedReadBufferBase::decompress(BufferBase::Buffer & to, size_t size_decompressed, size_t size_compressed_without_checksum) { readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs); diff --git a/src/Compression/CompressedReadBufferBase.h b/src/Compression/CompressedReadBufferBase.h index 0457d59058c..4d76fe72fdc 100644 --- a/src/Compression/CompressedReadBufferBase.h +++ b/src/Compression/CompressedReadBufferBase.h @@ -42,7 +42,7 @@ protected: /// Read compressed data into compressed_buffer for asynchronous decompression to avoid the situation of "read compressed block across the compressed_in". /// /// Compressed block may not be completely contained in "compressed_in" buffer which means compressed block may be read across the "compressed_in". - /// For native LZ4/ZSTD, it has no probem in facing situation above because they are sychronous. + /// For native LZ4/ZSTD, it has no problem in facing situation above because they are synchronous. /// But for asynchronous decompression, such as QPL deflate, it requires source and target buffer for decompression can not be overwritten until execution complete. /// /// Returns number of compressed bytes read. diff --git a/src/Compression/CompressionCodecDeflateQpl.cpp b/src/Compression/CompressionCodecDeflateQpl.cpp index 7d566e14a87..81ec7ee5dca 100644 --- a/src/Compression/CompressionCodecDeflateQpl.cpp +++ b/src/Compression/CompressionCodecDeflateQpl.cpp @@ -109,11 +109,11 @@ bool DeflateQplJobHWPool::tryLockJob(UInt32 index) return hw_job_ptr_locks[index].compare_exchange_strong(expected, true); } - void DeflateQplJobHWPool::unLockJob(UInt32 index) - { +void DeflateQplJobHWPool::unLockJob(UInt32 index) +{ assert(index < MAX_HW_JOB_NUMBER); hw_job_ptr_locks[index].store(false); - } +} //HardwareCodecDeflateQpl HardwareCodecDeflateQpl::HardwareCodecDeflateQpl() @@ -374,7 +374,7 @@ void CompressionCodecDeflateQpl::doDecompressData(const char * source, UInt32 so if (DeflateQplJobHWPool::instance().isJobPoolReady()) { res = hw_codec->doDecompressDataSynchronous(source, source_size, dest, uncompressed_size); - if(res == HardwareCodecDeflateQpl::RET_ERROR) + if (res == HardwareCodecDeflateQpl::RET_ERROR) sw_codec->doDecompressData(source, source_size, dest, uncompressed_size); } else diff --git a/src/Compression/CompressionCodecDeflateQpl.h b/src/Compression/CompressionCodecDeflateQpl.h index 125f35eb015..c15f537fd3f 100644 --- a/src/Compression/CompressionCodecDeflateQpl.h +++ b/src/Compression/CompressionCodecDeflateQpl.h @@ -12,7 +12,8 @@ class Logger; namespace DB { -/// DeflateQplJobHWPool is resource pool for provide the job objects which is required to save context infomation during offload asynchronous compression to IAA. +/// DeflateQplJobHWPool is resource pool to provide the job objects. +/// Job object is used for storing context information during offloading compression job to HW Accelerator. class DeflateQplJobHWPool { public: diff --git a/src/Compression/ICompressionCodec.h b/src/Compression/ICompressionCodec.h index 51d08ae8f33..f40404a84f3 100644 --- a/src/Compression/ICompressionCodec.h +++ b/src/Compression/ICompressionCodec.h @@ -52,7 +52,7 @@ public: /// Synchronous mode which is commonly used by default; /// --- For the codec with HW decompressor, it means submit request to HW and busy wait till complete. /// Asynchronous mode which required HW decompressor support; - /// --- For the codec with HW decompressor, it means submit request to HW and return immeditately. + /// --- For the codec with HW decompressor, it means submit request to HW and return immediately. /// --- Must be used in pair with flushAsynchronousDecompressRequests. /// SoftwareFallback mode is exclusively defined for the codec with HW decompressor, enable its capability of "fallback to SW codec". enum class CodecMode