mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 21:42:39 +00:00
restructure code design to be compatible both of hardware and software codec
This commit is contained in:
parent
2b684ff40e
commit
1236d74ae7
@ -1,11 +1,11 @@
|
||||
#include <thread>
|
||||
#include <cstdio>
|
||||
#include <thread>
|
||||
#include <Compression/CompressionCodecDeflate.h>
|
||||
#include <Compression/CompressionFactory.h>
|
||||
#include <Compression/CompressionInfo.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Poco/Logger.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -14,7 +14,7 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_COMPRESS;
|
||||
extern const int CANNOT_DECOMPRESS;
|
||||
}
|
||||
|
||||
//DeflateJobHWPool
|
||||
qpl_job * DeflateJobHWPool::jobPool[jobPoolSize];
|
||||
std::atomic_bool DeflateJobHWPool::jobLock[jobPoolSize];
|
||||
|
||||
@ -29,31 +29,28 @@ DeflateJobHWPool::DeflateJobHWPool()
|
||||
log = &Poco::Logger::get("DeflateJobHWPool");
|
||||
if (initJobPool() < 0)
|
||||
{
|
||||
jobPoolAvailable = false;
|
||||
LOG_WARNING(log, "DeflateJobHWPool initializing fail! Please check if IAA hardware support.");
|
||||
jobPoolEnabled = false;
|
||||
LOG_WARNING(log, "DeflateJobHWPool is not ready. Please check if IAA hardware support.Auto switch to deflate software codec here");
|
||||
}
|
||||
else
|
||||
{
|
||||
jobPoolAvailable = true;
|
||||
}
|
||||
jobPoolEnabled = true;
|
||||
}
|
||||
DeflateJobHWPool::~DeflateJobHWPool()
|
||||
{
|
||||
destroyJobPool();
|
||||
}
|
||||
|
||||
CompressionCodecDeflate::CompressionCodecDeflate()
|
||||
//HardwareCodecDeflate
|
||||
HardwareCodecDeflate::HardwareCodecDeflate()
|
||||
{
|
||||
log = &Poco::Logger::get("CompressionCodecDeflate");
|
||||
setCodecDescription("DEFLATE");
|
||||
jobSWPtr = initSoftwareJobCodecPtr();
|
||||
log = &Poco::Logger::get("HardwareCodecDeflate");
|
||||
hwEnabled = DeflateJobHWPool::instance().jobPoolReady();
|
||||
}
|
||||
|
||||
CompressionCodecDeflate::~CompressionCodecDeflate()
|
||||
HardwareCodecDeflate::~HardwareCodecDeflate()
|
||||
{
|
||||
if (!jobDecompAsyncMap.empty())
|
||||
{
|
||||
LOG_ERROR(log, "Exception -> find un-released job when CompressionCodecDeflate destroy");
|
||||
LOG_WARNING(log, "Find un-released job when HardwareCodecDeflate destroy");
|
||||
for (auto it : jobDecompAsyncMap)
|
||||
{
|
||||
DeflateJobHWPool::instance().releaseJob(it.first);
|
||||
@ -69,84 +66,14 @@ CompressionCodecDeflate::~CompressionCodecDeflate()
|
||||
jobCompAsyncList.clear();
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t CompressionCodecDeflate::getMethodByte() const
|
||||
{
|
||||
return static_cast<uint8_t>(CompressionMethodByte::Deflate);
|
||||
}
|
||||
|
||||
bool CompressionCodecDeflate::isAsyncSupported() const
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
void CompressionCodecDeflate::updateHash(SipHash & hash) const
|
||||
{
|
||||
getCodecDesc()->updateTreeHash(hash);
|
||||
}
|
||||
|
||||
#define DEFLATE_COMPRESSBOUND(isize) ((isize) + ((isize) >> 12) + ((isize) >> 14) + ((isize) >> 25) + 13) //Aligned with ZLIB
|
||||
uint32_t CompressionCodecDeflate::getMaxCompressedDataSize(uint32_t uncompressed_size) const
|
||||
{
|
||||
return DEFLATE_COMPRESSBOUND(uncompressed_size);
|
||||
}
|
||||
|
||||
qpl_job * CompressionCodecDeflate::initSoftwareJobCodecPtr()
|
||||
{
|
||||
qpl_job * job_ptr;
|
||||
qpl_status status;
|
||||
uint32_t size = 0;
|
||||
std::unique_ptr<uint8_t[]> job_buffer;
|
||||
|
||||
// Job initialization
|
||||
status = qpl_get_job_size(qpl_path_software, &size);
|
||||
if (status != QPL_STS_OK)
|
||||
{
|
||||
throw Exception("initSoftwareJobCodecPtr: qpl_get_job_size fail:"+ std::to_string(status), ErrorCodes::CANNOT_COMPRESS);
|
||||
}
|
||||
|
||||
job_buffer = std::make_unique<uint8_t[]>(size);
|
||||
job_ptr = reinterpret_cast<qpl_job *>(job_buffer.get());
|
||||
|
||||
status = qpl_init_job(qpl_path_software, job_ptr);
|
||||
if (status != QPL_STS_OK)
|
||||
{
|
||||
throw Exception("initSoftwareJobCodecPtr: qpl_init_job fail:"+ std::to_string(status), ErrorCodes::CANNOT_COMPRESS);
|
||||
}
|
||||
return job_ptr;
|
||||
}
|
||||
|
||||
uint32_t CompressionCodecDeflate::doCompressDataSW(const char * source, uint32_t source_size, char * dest)const
|
||||
{
|
||||
qpl_status status;
|
||||
qpl_job * job_ptr = jobSWPtr;
|
||||
// Performing a compression operation
|
||||
job_ptr->op = qpl_op_compress;
|
||||
job_ptr->next_in_ptr = reinterpret_cast<uint8_t *>(const_cast<char *>(source));
|
||||
job_ptr->next_out_ptr = reinterpret_cast<uint8_t *>(dest);
|
||||
job_ptr->available_in = source_size;
|
||||
job_ptr->available_out = getMaxCompressedDataSize(source_size);
|
||||
job_ptr->level = qpl_high_level;
|
||||
job_ptr->flags = QPL_FLAG_FIRST | QPL_FLAG_DYNAMIC_HUFFMAN | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
|
||||
|
||||
// Compression
|
||||
status = qpl_execute_job(job_ptr);
|
||||
if (status != QPL_STS_OK)
|
||||
{
|
||||
throw Exception("doCompressDataSW cannot compress: qpl_execute_job fail:" + std::to_string(status), ErrorCodes::CANNOT_COMPRESS);
|
||||
}
|
||||
|
||||
return job_ptr->total_out;
|
||||
}
|
||||
|
||||
uint32_t CompressionCodecDeflate::doCompressData(const char * source, uint32_t source_size, char * dest) const
|
||||
uint32_t HardwareCodecDeflate::doCompressData(const char * source, uint32_t source_size, char * dest, uint32_t dest_size) const
|
||||
{
|
||||
uint32_t job_id = 0;
|
||||
qpl_job * job_ptr = DeflateJobHWPool::instance().acquireJob(&job_id);
|
||||
if (job_ptr == nullptr)
|
||||
{
|
||||
LOG_WARNING(log, "doCompressData HW acquireJob fail! switch to SW compress...");
|
||||
return doCompressDataSW(source, source_size, dest);
|
||||
LOG_WARNING(log, "HardwareCodecDeflate::doCompressData acquireJob fail!");
|
||||
return 0;
|
||||
}
|
||||
qpl_status status;
|
||||
uint32_t compressed_size = 0;
|
||||
@ -156,32 +83,28 @@ uint32_t CompressionCodecDeflate::doCompressData(const char * source, uint32_t s
|
||||
job_ptr->next_out_ptr = reinterpret_cast<uint8_t *>(dest);
|
||||
job_ptr->available_in = source_size;
|
||||
job_ptr->level = qpl_default_level;
|
||||
job_ptr->available_out = getMaxCompressedDataSize(source_size);
|
||||
job_ptr->available_out = dest_size;
|
||||
job_ptr->flags = QPL_FLAG_FIRST | QPL_FLAG_DYNAMIC_HUFFMAN | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
|
||||
// Compression
|
||||
status = qpl_execute_job(job_ptr);
|
||||
if (QPL_STS_OK == status)
|
||||
{
|
||||
compressed_size = job_ptr->total_out;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_WARNING(log, "doCompressData HW fail! switch to SW compress ->status: '{}' ", static_cast<size_t>(status));
|
||||
compressed_size = doCompressDataSW(source, source_size, dest);
|
||||
}
|
||||
LOG_WARNING(log, "HardwareCodecDeflate::doCompressData fail ->status: '{}' ", static_cast<size_t>(status));
|
||||
|
||||
DeflateJobHWPool::instance().releaseJob(job_id);
|
||||
return compressed_size;
|
||||
}
|
||||
|
||||
UInt32 CompressionCodecDeflate::doCompressDataReq(const char * source, UInt32 source_size, char * dest, UInt32 & req_id)
|
||||
uint32_t HardwareCodecDeflate::doCompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t dest_size)
|
||||
{
|
||||
uint32_t job_id = 0;
|
||||
req_id = 0;
|
||||
qpl_job * job_ptr = DeflateJobHWPool::instance().acquireJob(&job_id);
|
||||
if (job_ptr == nullptr)
|
||||
{
|
||||
LOG_WARNING(log, "doCompressDataReq HW acquireJob fail! switch to SW compress...");
|
||||
return doCompressDataSW(source, source_size, dest);
|
||||
LOG_WARNING(log, "HardwareCodecDeflate::doCompressDataReq acquireJob fail!");
|
||||
return 0;
|
||||
}
|
||||
qpl_status status;
|
||||
|
||||
@ -190,27 +113,27 @@ UInt32 CompressionCodecDeflate::doCompressDataReq(const char * source, UInt32 so
|
||||
job_ptr->next_out_ptr = reinterpret_cast<uint8_t *>(dest);
|
||||
job_ptr->available_in = source_size;
|
||||
job_ptr->level = qpl_default_level;
|
||||
job_ptr->available_out = getMaxCompressedDataSize(source_size);
|
||||
job_ptr->available_out = dest_size;
|
||||
job_ptr->flags = QPL_FLAG_FIRST | QPL_FLAG_DYNAMIC_HUFFMAN | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
|
||||
// Compression
|
||||
status = qpl_submit_job(job_ptr);
|
||||
if (QPL_STS_OK != status)
|
||||
if (QPL_STS_OK == status)
|
||||
{
|
||||
LOG_WARNING(log, "doCompressDataReq HW fail! switch to SW compress ->status: '{}' ", static_cast<size_t>(status));
|
||||
DeflateJobHWPool::instance().releaseJob(job_id);
|
||||
return doCompressDataSW(source, source_size, dest);
|
||||
jobCompAsyncList.push_back(job_id);
|
||||
return job_id;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_WARNING(log, "HardwareCodecDeflate::doCompressDataReq fail ->status: '{}' ", static_cast<size_t>(status));
|
||||
DeflateJobHWPool::instance().releaseJob(job_id);
|
||||
return 0;
|
||||
}
|
||||
//LOG_WARNING(log, "doCompressDataReq ->job_id:{}, source_size:{}",job_id, source_size);
|
||||
jobCompAsyncList.push_back(job_id);
|
||||
req_id = job_id;
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint32_t CompressionCodecDeflate::doCompressDataFlush(uint32_t req_id)
|
||||
uint32_t HardwareCodecDeflate::doCompressDataFlush(uint32_t req_id)
|
||||
{
|
||||
uint32_t compressed_size = 0;
|
||||
qpl_job * job_ptr = DeflateJobHWPool::instance().getJobPtr(req_id);
|
||||
if(nullptr != job_ptr)
|
||||
if (nullptr != job_ptr)
|
||||
{
|
||||
while (QPL_STS_BEING_PROCESSED == qpl_check_job(job_ptr))
|
||||
{
|
||||
@ -222,14 +145,14 @@ uint32_t CompressionCodecDeflate::doCompressDataFlush(uint32_t req_id)
|
||||
return compressed_size;
|
||||
}
|
||||
|
||||
void CompressionCodecDeflate::doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const
|
||||
uint32_t HardwareCodecDeflate::doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const
|
||||
{
|
||||
uint32_t job_id = 0;
|
||||
qpl_job * job_ptr = DeflateJobHWPool::instance().acquireJob(&job_id);
|
||||
if (job_ptr == nullptr)
|
||||
{
|
||||
LOG_WARNING(log, "doDecompressData HW acquireJob fail! switch to SW decompress");
|
||||
return doDecompressDataSW(source, source_size, dest, uncompressed_size);
|
||||
LOG_WARNING(log, "HardwareCodecDeflate::doDecompressData acquireJob fail!");
|
||||
return 0;
|
||||
}
|
||||
qpl_status status;
|
||||
|
||||
@ -243,49 +166,28 @@ void CompressionCodecDeflate::doDecompressData(const char * source, uint32_t sou
|
||||
|
||||
// Decompression
|
||||
status = qpl_execute_job(job_ptr);
|
||||
if (status != QPL_STS_OK)
|
||||
|
||||
if (status == QPL_STS_OK)
|
||||
{
|
||||
LOG_WARNING(
|
||||
log,
|
||||
"doDecompressData HW fail! switch to SW decompress ->status: '{}' ,source_size: '{}' ,uncompressed_size: '{}' ",
|
||||
static_cast<size_t>(status),
|
||||
source_size,
|
||||
uncompressed_size);
|
||||
doDecompressDataSW(source, source_size, dest, uncompressed_size);
|
||||
DeflateJobHWPool::instance().releaseJob(job_id);
|
||||
return job_ptr->total_out;
|
||||
}
|
||||
DeflateJobHWPool::instance().releaseJob(job_id);
|
||||
}
|
||||
|
||||
void CompressionCodecDeflate::doDecompressDataSW(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size)const
|
||||
{
|
||||
qpl_status status;
|
||||
qpl_job * job_ptr = jobSWPtr;
|
||||
|
||||
// Performing a decompression operation
|
||||
job_ptr->op = qpl_op_decompress;
|
||||
job_ptr->next_in_ptr = reinterpret_cast<uint8_t *>(const_cast<char *>(source));
|
||||
job_ptr->next_out_ptr = reinterpret_cast<uint8_t *>(dest);
|
||||
job_ptr->available_in = source_size;
|
||||
job_ptr->available_out = uncompressed_size;
|
||||
job_ptr->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST;
|
||||
|
||||
// Decompression
|
||||
status = qpl_execute_job(job_ptr);
|
||||
if (status != QPL_STS_OK)
|
||||
else
|
||||
{
|
||||
throw Exception("doDecompressDataSW cannot decompress: qpl_execute_job fail:"+ std::to_string(status), ErrorCodes::CANNOT_DECOMPRESS);
|
||||
LOG_WARNING(log, "HardwareCodecDeflate::doDecompressData fail ->status: '{}' ", static_cast<size_t>(status));
|
||||
DeflateJobHWPool::instance().releaseJob(job_id);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
void CompressionCodecDeflate::doDecompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size)
|
||||
uint32_t HardwareCodecDeflate::doDecompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size)
|
||||
{
|
||||
uint32_t job_id = 0;
|
||||
qpl_job * job_ptr = DeflateJobHWPool::instance().acquireJob(&job_id);
|
||||
if (job_ptr == nullptr)
|
||||
{
|
||||
LOG_WARNING(log, "doDecompressDataReq acquireJob fail! switch to SW decompress");
|
||||
doDecompressDataSW(source, source_size, dest, uncompressed_size);
|
||||
return;
|
||||
LOG_WARNING(log, "HardwareCodecDeflate::doDecompressDataReq acquireJob fail!");
|
||||
return 0;
|
||||
}
|
||||
qpl_status status;
|
||||
|
||||
@ -302,16 +204,17 @@ void CompressionCodecDeflate::doDecompressDataReq(const char * source, uint32_t
|
||||
if (QPL_STS_OK == status)
|
||||
{
|
||||
jobDecompAsyncMap.insert(std::make_pair(job_id, job_ptr));
|
||||
return job_id;
|
||||
}
|
||||
else
|
||||
{
|
||||
DeflateJobHWPool::instance().releaseJob(job_id);
|
||||
LOG_WARNING(log, "doDecompressDataReq HW fail! switch to SW decompress... ->status: '{}' ", static_cast<size_t>(status));
|
||||
doDecompressDataSW(source, source_size, dest, uncompressed_size);
|
||||
LOG_WARNING(log, "HardwareCodecDeflate::doDecompressDataReq fail ->status: '{}' ", static_cast<size_t>(status));
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
void CompressionCodecDeflate::doDecompressDataFlush()
|
||||
void HardwareCodecDeflate::doDecompressDataFlush()
|
||||
{
|
||||
uint32_t job_id = 0;
|
||||
qpl_job * job_ptr = nullptr;
|
||||
@ -335,6 +238,8 @@ void CompressionCodecDeflate::doDecompressDataFlush()
|
||||
DeflateJobHWPool::instance().releaseJob(job_id);
|
||||
it = jobDecompAsyncMap.erase(it);
|
||||
n_jobs_processing--;
|
||||
if (n_jobs_processing <= 0)
|
||||
break;
|
||||
}
|
||||
if (it == jobDecompAsyncMap.end())
|
||||
{
|
||||
@ -343,11 +248,187 @@ void CompressionCodecDeflate::doDecompressDataFlush()
|
||||
}
|
||||
}
|
||||
}
|
||||
//SoftwareCodecDeflate
|
||||
SoftwareCodecDeflate::SoftwareCodecDeflate()
|
||||
{
|
||||
jobSWPtr = nullptr;
|
||||
}
|
||||
|
||||
SoftwareCodecDeflate::~SoftwareCodecDeflate()
|
||||
{
|
||||
if (nullptr != jobSWPtr)
|
||||
qpl_fini_job(jobSWPtr);
|
||||
}
|
||||
|
||||
qpl_job * SoftwareCodecDeflate::getJobCodecPtr()
|
||||
{
|
||||
if (nullptr == jobSWPtr)
|
||||
{
|
||||
qpl_status status;
|
||||
uint32_t size = 0;
|
||||
// Job initialization
|
||||
status = qpl_get_job_size(qpl_path_software, &size);
|
||||
if (status != QPL_STS_OK)
|
||||
{
|
||||
throw Exception(
|
||||
"SoftwareCodecDeflate::getJobCodecPtr -> qpl_get_job_size fail:" + std::to_string(status), ErrorCodes::CANNOT_COMPRESS);
|
||||
}
|
||||
|
||||
jobSWbuffer = std::make_unique<uint8_t[]>(size);
|
||||
jobSWPtr = reinterpret_cast<qpl_job *>(jobSWbuffer.get());
|
||||
|
||||
status = qpl_init_job(qpl_path_software, jobSWPtr);
|
||||
if (status != QPL_STS_OK)
|
||||
{
|
||||
throw Exception(
|
||||
"SoftwareCodecDeflate::getJobCodecPtr -> qpl_init_job fail:" + std::to_string(status), ErrorCodes::CANNOT_COMPRESS);
|
||||
}
|
||||
}
|
||||
return jobSWPtr;
|
||||
}
|
||||
|
||||
uint32_t SoftwareCodecDeflate::doCompressData(const char * source, uint32_t source_size, char * dest, uint32_t dest_size)
|
||||
{
|
||||
qpl_status status;
|
||||
qpl_job * job_ptr = getJobCodecPtr();
|
||||
// Performing a compression operation
|
||||
job_ptr->op = qpl_op_compress;
|
||||
job_ptr->next_in_ptr = reinterpret_cast<uint8_t *>(const_cast<char *>(source));
|
||||
job_ptr->next_out_ptr = reinterpret_cast<uint8_t *>(dest);
|
||||
job_ptr->available_in = source_size;
|
||||
job_ptr->available_out = dest_size;
|
||||
job_ptr->level = qpl_default_level;
|
||||
job_ptr->flags = QPL_FLAG_FIRST | QPL_FLAG_DYNAMIC_HUFFMAN | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
|
||||
|
||||
// Compression
|
||||
status = qpl_execute_job(job_ptr);
|
||||
if (status != QPL_STS_OK)
|
||||
{
|
||||
throw Exception(
|
||||
"SoftwareCodecDeflate::doCompressData -> qpl_execute_job fail:" + std::to_string(status), ErrorCodes::CANNOT_COMPRESS);
|
||||
}
|
||||
|
||||
return job_ptr->total_out;
|
||||
}
|
||||
|
||||
void SoftwareCodecDeflate::doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size)
|
||||
{
|
||||
qpl_status status;
|
||||
qpl_job * job_ptr = getJobCodecPtr();
|
||||
|
||||
// Performing a decompression operation
|
||||
job_ptr->op = qpl_op_decompress;
|
||||
job_ptr->next_in_ptr = reinterpret_cast<uint8_t *>(const_cast<char *>(source));
|
||||
job_ptr->next_out_ptr = reinterpret_cast<uint8_t *>(dest);
|
||||
job_ptr->available_in = source_size;
|
||||
job_ptr->available_out = uncompressed_size;
|
||||
job_ptr->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST;
|
||||
|
||||
// Decompression
|
||||
status = qpl_execute_job(job_ptr);
|
||||
if (status != QPL_STS_OK)
|
||||
{
|
||||
throw Exception(
|
||||
"SoftwareCodecDeflate::doDecompressData -> qpl_execute_job fail:" + std::to_string(status), ErrorCodes::CANNOT_DECOMPRESS);
|
||||
}
|
||||
}
|
||||
|
||||
//CompressionCodecDeflate
|
||||
CompressionCodecDeflate::CompressionCodecDeflate()
|
||||
{
|
||||
setCodecDescription("DEFLATE");
|
||||
hwCodec = std::make_unique<HardwareCodecDeflate>();
|
||||
swCodec = std::make_unique<SoftwareCodecDeflate>();
|
||||
}
|
||||
|
||||
uint8_t CompressionCodecDeflate::getMethodByte() const
|
||||
{
|
||||
return static_cast<uint8_t>(CompressionMethodByte::Deflate);
|
||||
}
|
||||
|
||||
bool CompressionCodecDeflate::isAsyncSupported() const
|
||||
{
|
||||
return hwCodec->hwEnabled;
|
||||
}
|
||||
|
||||
void CompressionCodecDeflate::updateHash(SipHash & hash) const
|
||||
{
|
||||
getCodecDesc()->updateTreeHash(hash);
|
||||
}
|
||||
|
||||
#define DEFLATE_COMPRESSBOUND(isize) ((isize) + ((isize) >> 12) + ((isize) >> 14) + ((isize) >> 25) + 13) //Aligned with ZLIB
|
||||
uint32_t CompressionCodecDeflate::getMaxCompressedDataSize(uint32_t uncompressed_size) const
|
||||
{
|
||||
return DEFLATE_COMPRESSBOUND(uncompressed_size);
|
||||
}
|
||||
|
||||
uint32_t CompressionCodecDeflate::doCompressDataSW(const char * source, uint32_t source_size, char * dest) const
|
||||
{
|
||||
return swCodec->doCompressData(source, source_size, dest, getMaxCompressedDataSize(source_size));
|
||||
}
|
||||
|
||||
uint32_t CompressionCodecDeflate::doCompressData(const char * source, uint32_t source_size, char * dest) const
|
||||
{
|
||||
uint32_t res = 0;
|
||||
if (hwCodec->hwEnabled)
|
||||
res = hwCodec->doCompressData(source, source_size, dest, getMaxCompressedDataSize(source_size));
|
||||
if (0 == res)
|
||||
res = swCodec->doCompressData(source, source_size, dest, getMaxCompressedDataSize(source_size));
|
||||
return res;
|
||||
}
|
||||
|
||||
uint32_t CompressionCodecDeflate::doCompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t & req_id)
|
||||
{
|
||||
if (hwCodec->hwEnabled)
|
||||
req_id = hwCodec->doCompressDataReq(source, source_size, dest, getMaxCompressedDataSize(source_size));
|
||||
else
|
||||
req_id = 0;
|
||||
|
||||
if (0 == req_id)
|
||||
return swCodec->doCompressData(source, source_size, dest, getMaxCompressedDataSize(source_size));
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint32_t CompressionCodecDeflate::doCompressDataFlush(uint32_t req_id)
|
||||
{
|
||||
if (hwCodec->hwEnabled)
|
||||
return hwCodec->doCompressDataFlush(req_id);
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
|
||||
void CompressionCodecDeflate::doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const
|
||||
{
|
||||
uint32_t res = 0;
|
||||
if (hwCodec->hwEnabled)
|
||||
res = hwCodec->doDecompressData(source, source_size, dest, uncompressed_size);
|
||||
if (0 == res)
|
||||
swCodec->doDecompressData(source, source_size, dest, uncompressed_size);
|
||||
}
|
||||
|
||||
void CompressionCodecDeflate::doDecompressDataSW(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const
|
||||
{
|
||||
return swCodec->doDecompressData(source, source_size, dest, uncompressed_size);
|
||||
}
|
||||
|
||||
void CompressionCodecDeflate::doDecompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size)
|
||||
{
|
||||
uint32_t res = 0;
|
||||
if (hwCodec->hwEnabled)
|
||||
res = hwCodec->doDecompressDataReq(source, source_size, dest, uncompressed_size);
|
||||
if (0 == res)
|
||||
swCodec->doDecompressData(source, source_size, dest, uncompressed_size);
|
||||
}
|
||||
|
||||
void CompressionCodecDeflate::doDecompressDataFlush()
|
||||
{
|
||||
if (hwCodec->hwEnabled)
|
||||
hwCodec->doDecompressDataFlush();
|
||||
}
|
||||
void registerCodecDeflate(CompressionCodecFactory & factory)
|
||||
{
|
||||
factory.registerSimpleCompressionCodec(
|
||||
"DEFLATE", static_cast<char>(CompressionMethodByte::Deflate), [&]() { return std::make_shared<CompressionCodecDeflate>(); });
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -26,10 +26,15 @@ public:
|
||||
static constexpr qpl_path_t PATH = qpl_path_hardware;
|
||||
static qpl_job * jobPool[jobPoolSize];
|
||||
static std::atomic_bool jobLock[jobPoolSize];
|
||||
bool jobPoolEnabled;
|
||||
|
||||
bool ALWAYS_INLINE jobPoolReady()
|
||||
{
|
||||
return jobPoolEnabled;
|
||||
}
|
||||
qpl_job * ALWAYS_INLINE acquireJob(uint32_t * job_id)
|
||||
{
|
||||
if(jobPoolAvailable)
|
||||
if (jobPoolEnabled)
|
||||
{
|
||||
uint32_t retry = 0;
|
||||
auto index = random(jobPoolSize);
|
||||
@ -52,7 +57,7 @@ public:
|
||||
}
|
||||
qpl_job * ALWAYS_INLINE releaseJob(uint32_t job_id)
|
||||
{
|
||||
if(jobPoolAvailable)
|
||||
if (jobPoolEnabled)
|
||||
{
|
||||
uint32_t index = jobPoolSize - job_id;
|
||||
ReleaseJobObjectGuard _(index);
|
||||
@ -65,7 +70,7 @@ public:
|
||||
}
|
||||
qpl_job * ALWAYS_INLINE getJobPtr(uint32_t job_id)
|
||||
{
|
||||
if(jobPoolAvailable)
|
||||
if (jobPoolEnabled)
|
||||
{
|
||||
uint32_t index = jobPoolSize - job_id;
|
||||
return jobPool[index];
|
||||
@ -77,8 +82,6 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
bool jobPoolAvailable;
|
||||
Poco::Logger * log;
|
||||
size_t ALWAYS_INLINE random(uint32_t pool_size)
|
||||
{
|
||||
size_t tsc = 0;
|
||||
@ -177,13 +180,45 @@ private:
|
||||
jobLock[index].store(false);
|
||||
}
|
||||
};
|
||||
Poco::Logger * log;
|
||||
};
|
||||
class SoftwareCodecDeflate
|
||||
{
|
||||
public:
|
||||
SoftwareCodecDeflate();
|
||||
~SoftwareCodecDeflate();
|
||||
uint32_t doCompressData(const char * source, uint32_t source_size, char * dest, uint32_t dest_size);
|
||||
void doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size);
|
||||
|
||||
private:
|
||||
qpl_job * jobSWPtr; //Software Job Codec Ptr
|
||||
std::unique_ptr<uint8_t[]> jobSWbuffer;
|
||||
qpl_job * getJobCodecPtr();
|
||||
};
|
||||
|
||||
class HardwareCodecDeflate
|
||||
{
|
||||
public:
|
||||
bool hwEnabled;
|
||||
HardwareCodecDeflate();
|
||||
~HardwareCodecDeflate();
|
||||
uint32_t doCompressData(const char * source, uint32_t source_size, char * dest, uint32_t dest_size) const;
|
||||
uint32_t doCompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t dest_size);
|
||||
uint32_t doCompressDataFlush(uint32_t req_id);
|
||||
uint32_t doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const;
|
||||
uint32_t doDecompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size);
|
||||
void doDecompressDataFlush();
|
||||
|
||||
private:
|
||||
std::map<uint32_t, qpl_job *> jobDecompAsyncMap;
|
||||
std::vector<uint32_t> jobCompAsyncList;
|
||||
Poco::Logger * log;
|
||||
};
|
||||
class CompressionCodecDeflate : public ICompressionCodec
|
||||
{
|
||||
public:
|
||||
CompressionCodecDeflate();
|
||||
~CompressionCodecDeflate() override;
|
||||
//~CompressionCodecDeflate() ;
|
||||
uint8_t getMethodByte() const override;
|
||||
void updateHash(SipHash & hash) const override;
|
||||
bool isAsyncSupported() const override;
|
||||
@ -197,10 +232,9 @@ protected:
|
||||
{
|
||||
return true;
|
||||
}
|
||||
qpl_job * initSoftwareJobCodecPtr();
|
||||
uint32_t doCompressData(const char * source, uint32_t source_size, char * dest) const override;
|
||||
uint32_t doCompressDataSW(const char * source, uint32_t source_size, char * dest) const;
|
||||
UInt32 doCompressDataReq(const char * source, UInt32 source_size, char * dest, uint32_t & req_id) override;
|
||||
uint32_t doCompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t & req_id) override;
|
||||
uint32_t doCompressDataFlush(uint32_t req_id) override;
|
||||
|
||||
void doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const override;
|
||||
@ -210,10 +244,8 @@ protected:
|
||||
|
||||
private:
|
||||
uint32_t getMaxCompressedDataSize(uint32_t uncompressed_size) const override;
|
||||
std::map<uint32_t, qpl_job *> jobDecompAsyncMap;
|
||||
std::vector<uint32_t> jobCompAsyncList;
|
||||
Poco::Logger * log;
|
||||
qpl_job * jobSWPtr; //Software Job Codec Ptr
|
||||
std::unique_ptr<HardwareCodecDeflate> hwCodec;
|
||||
std::unique_ptr<SoftwareCodecDeflate> swCodec;
|
||||
};
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user