mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
remove compress sync interface
This commit is contained in:
parent
b2f98fa73b
commit
fe451a4317
@ -113,7 +113,7 @@ target_compile_options(isal_asm PUBLIC "-I${QPL_SRC_DIR}/isal/include/"
|
||||
|
||||
if (SANITIZE STREQUAL "undefined")
|
||||
get_target_property(target_options isal_asm COMPILE_OPTIONS)
|
||||
list(REMOVE_ITEM target_options "no-sanitize=undefined")
|
||||
list(REMOVE_ITEM target_options "-fno-sanitize=undefined")
|
||||
set_property(TARGET isal_asm PROPERTY COMPILE_OPTIONS ${target_options})
|
||||
endif()
|
||||
|
||||
|
@ -114,7 +114,6 @@ if (BUILD_STANDALONE_KEEPER)
|
||||
ch_contrib::nuraft
|
||||
ch_contrib::lz4
|
||||
ch_contrib::zstd
|
||||
ch_contrib::qpl
|
||||
ch_contrib::cityhash
|
||||
common ch_contrib::double_conversion
|
||||
ch_contrib::dragonbox_to_chars
|
||||
|
@ -57,14 +57,6 @@ HardwareCodecDeflate::~HardwareCodecDeflate()
|
||||
}
|
||||
jobDecompAsyncMap.clear();
|
||||
}
|
||||
if (!jobCompAsyncList.empty())
|
||||
{
|
||||
for (auto id : jobCompAsyncList)
|
||||
{
|
||||
DeflateJobHWPool::instance().releaseJob(id);
|
||||
}
|
||||
jobCompAsyncList.clear();
|
||||
}
|
||||
}
|
||||
uint32_t HardwareCodecDeflate::doCompressData(const char * source, uint32_t source_size, char * dest, uint32_t dest_size) const
|
||||
{
|
||||
@ -97,54 +89,6 @@ uint32_t HardwareCodecDeflate::doCompressData(const char * source, uint32_t sour
|
||||
return compressed_size;
|
||||
}
|
||||
|
||||
uint32_t HardwareCodecDeflate::doCompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t dest_size)
|
||||
{
|
||||
uint32_t job_id = 0;
|
||||
qpl_job * job_ptr = DeflateJobHWPool::instance().acquireJob(&job_id);
|
||||
if (job_ptr == nullptr)
|
||||
{
|
||||
LOG_WARNING(log, "HardwareCodecDeflate::doCompressDataReq acquireJob fail!");
|
||||
return 0;
|
||||
}
|
||||
qpl_status status;
|
||||
|
||||
job_ptr->op = qpl_op_compress;
|
||||
job_ptr->next_in_ptr = reinterpret_cast<uint8_t *>(const_cast<char *>(source));
|
||||
job_ptr->next_out_ptr = reinterpret_cast<uint8_t *>(dest);
|
||||
job_ptr->available_in = source_size;
|
||||
job_ptr->level = qpl_default_level;
|
||||
job_ptr->available_out = dest_size;
|
||||
job_ptr->flags = QPL_FLAG_FIRST | QPL_FLAG_DYNAMIC_HUFFMAN | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
|
||||
status = qpl_submit_job(job_ptr);
|
||||
if (QPL_STS_OK == status)
|
||||
{
|
||||
jobCompAsyncList.push_back(job_id);
|
||||
return job_id;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_WARNING(log, "HardwareCodecDeflate::doCompressDataReq fail ->status: '{}' ", static_cast<size_t>(status));
|
||||
DeflateJobHWPool::instance().releaseJob(job_id);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t HardwareCodecDeflate::doCompressDataFlush(uint32_t req_id)
|
||||
{
|
||||
uint32_t compressed_size = 0;
|
||||
qpl_job * job_ptr = DeflateJobHWPool::instance().getJobPtr(req_id);
|
||||
if (nullptr != job_ptr)
|
||||
{
|
||||
while (QPL_STS_BEING_PROCESSED == qpl_check_job(job_ptr))
|
||||
{
|
||||
_tpause(1, __rdtsc() + 1000);
|
||||
}
|
||||
compressed_size = job_ptr->total_out;
|
||||
DeflateJobHWPool::instance().releaseJob(req_id);
|
||||
}
|
||||
return compressed_size;
|
||||
}
|
||||
|
||||
uint32_t HardwareCodecDeflate::doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const
|
||||
{
|
||||
uint32_t job_id = 0;
|
||||
@ -377,27 +321,6 @@ uint32_t CompressionCodecDeflate::doCompressData(const char * source, uint32_t s
|
||||
return res;
|
||||
}
|
||||
|
||||
uint32_t CompressionCodecDeflate::doCompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t & req_id)
|
||||
{
|
||||
if (hwCodec->hwEnabled)
|
||||
req_id = hwCodec->doCompressDataReq(source, source_size, dest, getMaxCompressedDataSize(source_size));
|
||||
else
|
||||
req_id = 0;
|
||||
|
||||
if (0 == req_id)
|
||||
return swCodec->doCompressData(source, source_size, dest, getMaxCompressedDataSize(source_size));
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint32_t CompressionCodecDeflate::doCompressDataFlush(uint32_t req_id)
|
||||
{
|
||||
if (hwCodec->hwEnabled)
|
||||
return hwCodec->doCompressDataFlush(req_id);
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
|
||||
void CompressionCodecDeflate::doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const
|
||||
{
|
||||
uint32_t res = 0;
|
||||
|
@ -203,15 +203,12 @@ public:
|
||||
HardwareCodecDeflate();
|
||||
~HardwareCodecDeflate();
|
||||
uint32_t doCompressData(const char * source, uint32_t source_size, char * dest, uint32_t dest_size) const;
|
||||
uint32_t doCompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t dest_size);
|
||||
uint32_t doCompressDataFlush(uint32_t req_id);
|
||||
uint32_t doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const;
|
||||
uint32_t doDecompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size);
|
||||
void doDecompressDataFlush();
|
||||
|
||||
private:
|
||||
std::map<uint32_t, qpl_job *> jobDecompAsyncMap;
|
||||
std::vector<uint32_t> jobCompAsyncList;
|
||||
Poco::Logger * log;
|
||||
};
|
||||
class CompressionCodecDeflate : public ICompressionCodec
|
||||
@ -234,9 +231,6 @@ protected:
|
||||
}
|
||||
uint32_t doCompressData(const char * source, uint32_t source_size, char * dest) const override;
|
||||
uint32_t doCompressDataSW(const char * source, uint32_t source_size, char * dest) const;
|
||||
uint32_t doCompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t & req_id) override;
|
||||
uint32_t doCompressDataFlush(uint32_t req_id) override;
|
||||
|
||||
void doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const override;
|
||||
void doDecompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) override;
|
||||
void doDecompressDataSW(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const override;
|
||||
|
@ -90,33 +90,6 @@ UInt32 ICompressionCodec::compress(const char * source, UInt32 source_size, char
|
||||
unalignedStore<UInt32>(&dest[5], source_size);
|
||||
return header_size + compressed_bytes_written;
|
||||
}
|
||||
UInt32 ICompressionCodec::compressReq(const char * source, UInt32 source_size, char * dest, UInt32 & req_id)
|
||||
{
|
||||
assert(source != nullptr && dest != nullptr);
|
||||
dest[0] = getMethodByte();
|
||||
UInt8 header_size = getHeaderSize();
|
||||
|
||||
UInt32 res = doCompressDataReq(source, source_size, &dest[header_size], req_id);
|
||||
if (res > 0)
|
||||
{
|
||||
unalignedStore<UInt32>(&dest[1], res + header_size);
|
||||
unalignedStore<UInt32>(&dest[5], source_size);
|
||||
return header_size + res;
|
||||
}
|
||||
else
|
||||
{
|
||||
unalignedStore<UInt32>(&dest[5], source_size);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
UInt32 ICompressionCodec::compressFlush(UInt32 req_id, char * dest)
|
||||
{
|
||||
UInt32 compressed_bytes_written = doCompressDataFlush(req_id);
|
||||
UInt8 header_size = getHeaderSize();
|
||||
unalignedStore<UInt32>(&dest[1], compressed_bytes_written + header_size);
|
||||
return header_size + compressed_bytes_written;
|
||||
}
|
||||
|
||||
UInt32 ICompressionCodec::decompress(const char * source, UInt32 source_size, char * dest, UInt8 req_type)
|
||||
{
|
||||
|
@ -44,9 +44,7 @@ public:
|
||||
|
||||
/// Compressed bytes from uncompressed source to dest. Dest should preallocate memory
|
||||
UInt32 compress(const char * source, UInt32 source_size, char * dest) const;
|
||||
UInt32 compressReq(const char * source, UInt32 source_size, char * dest, UInt32 & req_id);
|
||||
// Flush all asynchronous request for compression
|
||||
UInt32 compressFlush(UInt32 req_id, char * dest);
|
||||
|
||||
/// Decompress bytes from compressed source to dest. Dest should preallocate memory;
|
||||
// reqType is specific for HW decompressor:
|
||||
//0 means synchronous request by default;
|
||||
@ -110,19 +108,6 @@ protected:
|
||||
/// Actually compress data, without header
|
||||
virtual UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const = 0;
|
||||
|
||||
/// Asynchronous compression request to HW decompressor
|
||||
virtual UInt32 doCompressDataReq(const char * source, UInt32 source_size, char * dest, UInt32 & req_id)
|
||||
{
|
||||
req_id = 0;
|
||||
return doCompressData(source, source_size, dest);
|
||||
}
|
||||
|
||||
/// Flush asynchronous request for compression
|
||||
virtual UInt32 doCompressDataFlush(UInt32 req_id = 0)
|
||||
{
|
||||
return req_id;
|
||||
}
|
||||
|
||||
/// Actually decompress data without header
|
||||
virtual void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const = 0;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user