fixed cosmetic issues

This commit is contained in:
jinjunzh 2022-07-11 12:08:35 -04:00
parent bc03e10870
commit bcba581191
2 changed files with 67 additions and 64 deletions

View File

@ -15,39 +15,39 @@ namespace ErrorCodes
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
}
qpl_job * DeflateQplJobHWPool::hw_job_ptr_pool[JOB_NUMBER];
std::atomic_bool DeflateQplJobHWPool::hw_job_ptr_locks[JOB_NUMBER];
std::array<qpl_job *, DeflateQplJobHWPool::MAX_HW_JOB_NUMBER> DeflateQplJobHWPool::hw_job_ptr_pool;
std::array<std::atomic_bool, DeflateQplJobHWPool::MAX_HW_JOB_NUMBER> DeflateQplJobHWPool::hw_job_ptr_locks;
bool DeflateQplJobHWPool::job_pool_ready;
std::unique_ptr<uint8_t[]> DeflateQplJobHWPool::hw_job_buffer;
std::unique_ptr<uint8_t[]> DeflateQplJobHWPool::hw_jobs_buffer;
DeflateQplJobHWPool & DeflateQplJobHWPool::instance()
{
static DeflateQplJobHWPool ret;
return ret;
static DeflateQplJobHWPool pool;
return pool;
}
DeflateQplJobHWPool::DeflateQplJobHWPool():
log(&Poco::Logger::get("DeflateQplJobHWPool")),
random_engine(std::random_device()()),
distribution(0, JOB_NUMBER-1)
DeflateQplJobHWPool::DeflateQplJobHWPool()
:random_engine(std::random_device()())
,distribution(0, MAX_HW_JOB_NUMBER-1)
{
Poco::Logger * log = &Poco::Logger::get("DeflateQplJobHWPool");
uint32_t job_size = 0;
uint32_t index = 0;
const char * qpl_version = qpl_get_library_version();
/// Get size required for saving a single qpl job object
qpl_get_job_size(PATH, &job_size);
qpl_get_job_size(qpl_path_hardware, &job_size);
/// Allocate entire buffer for storing all job objects
hw_job_buffer = std::make_unique<uint8_t[]>(job_size * JOB_NUMBER);
hw_jobs_buffer = std::make_unique<uint8_t[]>(job_size * MAX_HW_JOB_NUMBER);
/// Initialize pool for storing all job object pointers
memset(hw_job_ptr_pool, 0, JOB_NUMBER*sizeof(qpl_job *));
/// Reallocate buffer for each job object through shifting address offset
for (index = 0; index < JOB_NUMBER; ++index)
/// Reallocate buffer by shifting address offset for each job object.
for (index = 0; index < MAX_HW_JOB_NUMBER; ++index)
{
qpl_job * qpl_job_ptr = reinterpret_cast<qpl_job *>(hw_job_buffer.get() + index*job_size);
if ((!qpl_job_ptr) || (qpl_init_job(PATH, qpl_job_ptr) != QPL_STS_OK))
qpl_job * qpl_job_ptr = reinterpret_cast<qpl_job *>(hw_jobs_buffer.get() + index * job_size);
if (qpl_init_job(qpl_path_hardware, qpl_job_ptr) != QPL_STS_OK)
{
jobPoolReady() = false;
job_pool_ready = false;
LOG_WARNING(log, "Initialization of hardware-assisted DeflateQpl codec failed, falling back to software DeflateQpl codec. Please check if Intel In-Memory Analytics Accelerator (IAA) is properly set up. QPL Version:{}.",qpl_version);
return;
}
@ -55,13 +55,13 @@ DeflateQplJobHWPool::DeflateQplJobHWPool():
unLockJob(index);
}
jobPoolReady() = true;
job_pool_ready = true;
LOG_DEBUG(log, "Hardware-assisted DeflateQpl codec is ready! QPL Version:{}",qpl_version);
}
DeflateQplJobHWPool::~DeflateQplJobHWPool()
{
for (uint32_t i = 0; i < JOB_NUMBER; ++i)
for (uint32_t i = 0; i < MAX_HW_JOB_NUMBER; ++i)
{
if (hw_job_ptr_pool[i])
{
@ -71,26 +71,26 @@ DeflateQplJobHWPool::~DeflateQplJobHWPool()
hw_job_ptr_pool[i] = nullptr;
}
}
jobPoolReady() = false;
job_pool_ready = false;
}
qpl_job * DeflateQplJobHWPool::acquireJob(uint32_t * job_id)
{
if (jobPoolReady())
if (isJobPoolReady())
{
uint32_t retry = 0;
auto index = distribution(random_engine);
while (tryLockJob(index) == false)
while (!tryLockJob(index))
{
index = distribution(random_engine);
retry++;
if (retry > JOB_NUMBER)
if (retry > MAX_HW_JOB_NUMBER)
{
return nullptr;
}
}
*job_id = JOB_NUMBER - index;
assert(index < JOB_NUMBER);
*job_id = MAX_HW_JOB_NUMBER - index;
assert(index < MAX_HW_JOB_NUMBER);
return hw_job_ptr_pool[index];
}
else
@ -99,10 +99,10 @@ qpl_job * DeflateQplJobHWPool::acquireJob(uint32_t * job_id)
qpl_job * DeflateQplJobHWPool::releaseJob(uint32_t job_id)
{
if (jobPoolReady())
if (isJobPoolReady())
{
uint32_t index = JOB_NUMBER - job_id;
assert(index < JOB_NUMBER);
uint32_t index = MAX_HW_JOB_NUMBER - job_id;
assert(index < MAX_HW_JOB_NUMBER);
ReleaseJobObjectGuard _(index);
return hw_job_ptr_pool[index];
}
@ -113,13 +113,13 @@ qpl_job * DeflateQplJobHWPool::releaseJob(uint32_t job_id)
bool DeflateQplJobHWPool::tryLockJob(size_t index)
{
bool expected = false;
assert(index < JOB_NUMBER);
assert(index < MAX_HW_JOB_NUMBER);
return hw_job_ptr_locks[index].compare_exchange_strong(expected, true);
}
//HardwareCodecDeflateQpl
HardwareCodecDeflateQpl::HardwareCodecDeflateQpl():
log(&Poco::Logger::get("HardwareCodecDeflateQpl"))
HardwareCodecDeflateQpl::HardwareCodecDeflateQpl()
:log(&Poco::Logger::get("HardwareCodecDeflateQpl"))
{
}
@ -167,13 +167,13 @@ int32_t HardwareCodecDeflateQpl::doCompressData(const char * source, uint32_t so
return compressed_size;
}
int32_t HardwareCodecDeflateQpl::doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const
int32_t HardwareCodecDeflateQpl::doDecompressDataSynchronous(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const
{
uint32_t job_id = 0;
qpl_job* job_ptr = nullptr;
if (!(job_ptr = DeflateQplJobHWPool::instance().acquireJob(&job_id)))
{
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressData->acquireJob fail, probably job pool exhausted)");
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataSynchronous->acquireJob fail, probably job pool exhausted)");
return RET_ERROR;
}
// Performing a decompression operation
@ -191,19 +191,19 @@ int32_t HardwareCodecDeflateQpl::doDecompressData(const char * source, uint32_t
}
else
{
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressData->qpl_execute_job with error code:{} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", status);
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataSynchronous->qpl_execute_job with error code:{} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", status);
DeflateQplJobHWPool::instance().releaseJob(job_id);
return RET_ERROR;
}
}
int32_t HardwareCodecDeflateQpl::doDecompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size)
int32_t HardwareCodecDeflateQpl::doDecompressDataAsynchronous(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size)
{
uint32_t job_id = 0;
qpl_job * job_ptr = nullptr;
if (!(job_ptr = DeflateQplJobHWPool::instance().acquireJob(&job_id)))
{
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataReq->acquireJob fail, probably job pool exhausted)");
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataAsynchronous->acquireJob fail, probably job pool exhausted)");
return RET_ERROR;
}
@ -223,7 +223,7 @@ int32_t HardwareCodecDeflateQpl::doDecompressDataReq(const char * source, uint32
else
{
DeflateQplJobHWPool::instance().releaseJob(job_id);
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataReq->qpl_execute_job with error code:{} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", status);
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataAsynchronous->qpl_execute_job with error code:{} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", status);
return RET_ERROR;
}
}
@ -321,9 +321,9 @@ void SoftwareCodecDeflateQpl::doDecompressData(const char * source, uint32_t sou
}
//CompressionCodecDeflateQpl
CompressionCodecDeflateQpl::CompressionCodecDeflateQpl():
hw_codec(std::make_unique<HardwareCodecDeflateQpl>()),
sw_codec(std::make_unique<SoftwareCodecDeflateQpl>())
CompressionCodecDeflateQpl::CompressionCodecDeflateQpl()
:hw_codec(std::make_unique<HardwareCodecDeflateQpl>())
,sw_codec(std::make_unique<SoftwareCodecDeflateQpl>())
{
setCodecDescription("DEFLATE_QPL");
}
@ -347,7 +347,7 @@ uint32_t CompressionCodecDeflateQpl::getMaxCompressedDataSize(uint32_t uncompres
uint32_t CompressionCodecDeflateQpl::doCompressData(const char * source, uint32_t source_size, char * dest) const
{
int32_t res = HardwareCodecDeflateQpl::RET_ERROR;
if (DeflateQplJobHWPool::instance().jobPoolReady())
if (DeflateQplJobHWPool::instance().isJobPoolReady())
res = hw_codec->doCompressData(source, source_size, dest, getMaxCompressedDataSize(source_size));
if (res == HardwareCodecDeflateQpl::RET_ERROR)
res = sw_codec->doCompressData(source, source_size, dest, getMaxCompressedDataSize(source_size));
@ -361,30 +361,31 @@ void CompressionCodecDeflateQpl::doDecompressData(const char * source, uint32_t
case CodecMode::Synchronous:
{
int32_t res = HardwareCodecDeflateQpl::RET_ERROR;
if (DeflateQplJobHWPool::instance().jobPoolReady())
res = hw_codec->doDecompressData(source, source_size, dest, uncompressed_size);
if (DeflateQplJobHWPool::instance().isJobPoolReady())
res = hw_codec->doDecompressDataSynchronous(source, source_size, dest, uncompressed_size);
if (res == HardwareCodecDeflateQpl::RET_ERROR)
sw_codec->doDecompressData(source, source_size, dest, uncompressed_size);
break;
return;
}
case CodecMode::Asynchronous:
{
int32_t res = HardwareCodecDeflateQpl::RET_ERROR;
if (DeflateQplJobHWPool::instance().jobPoolReady())
res = hw_codec->doDecompressDataReq(source, source_size, dest, uncompressed_size);
if (DeflateQplJobHWPool::instance().isJobPoolReady())
res = hw_codec->doDecompressDataAsynchronous(source, source_size, dest, uncompressed_size);
if (res == HardwareCodecDeflateQpl::RET_ERROR)
sw_codec->doDecompressData(source, source_size, dest, uncompressed_size);
break;
return;
}
case CodecMode::SoftwareFallback:
sw_codec->doDecompressData(source, source_size, dest, uncompressed_size);
break;
return;
}
__builtin_unreachable();
}
void CompressionCodecDeflateQpl::flushAsynchronousDecompressRequests()
{
if (DeflateQplJobHWPool::instance().jobPoolReady())
if (DeflateQplJobHWPool::instance().isJobPoolReady())
hw_codec->flushAsynchronousDecompressRequests();
setDecompressMode(CodecMode::Synchronous);
}

View File

@ -19,18 +19,20 @@ public:
DeflateQplJobHWPool();
~DeflateQplJobHWPool();
bool & jobPoolReady() { return job_pool_ready;}
qpl_job * acquireJob(uint32_t * job_id);
qpl_job * releaseJob(uint32_t job_id);
static qpl_job * releaseJob(uint32_t job_id);
static const bool & isJobPoolReady() { return job_pool_ready; }
static DeflateQplJobHWPool & instance();
private:
bool tryLockJob(size_t index);
static constexpr auto MAX_HW_JOB_NUMBER = 1024;
void unLockJob(uint32_t index) { hw_job_ptr_locks[index].store(false); }
private:
static bool tryLockJob(size_t index);
static void unLockJob(uint32_t index) { hw_job_ptr_locks[index].store(false); }
class ReleaseJobObjectGuard
{
@ -43,16 +45,15 @@ private:
~ReleaseJobObjectGuard(){ hw_job_ptr_locks[index].store(false); }
};
static constexpr auto JOB_NUMBER = 1024;
static constexpr qpl_path_t PATH = qpl_path_hardware;
static qpl_job * hw_job_ptr_pool[JOB_NUMBER];
static std::atomic_bool hw_job_ptr_locks[JOB_NUMBER];
/// Entire buffer for storing all job objects
static std::unique_ptr<uint8_t[]> hw_jobs_buffer;
/// Job pool for storing all job object pointers
static std::array<qpl_job *, DeflateQplJobHWPool::MAX_HW_JOB_NUMBER> hw_job_ptr_pool;
/// Locks for accessing each job object pointers
static std::array<std::atomic_bool, DeflateQplJobHWPool::MAX_HW_JOB_NUMBER> hw_job_ptr_locks;
static bool job_pool_ready;
static std::unique_ptr<uint8_t[]> hw_job_buffer;
Poco::Logger * log;
std::mt19937 random_engine;
std::uniform_int_distribution<int> distribution;
};
class SoftwareCodecDeflateQpl
@ -76,8 +77,8 @@ public:
HardwareCodecDeflateQpl();
~HardwareCodecDeflateQpl();
int32_t doCompressData(const char * source, uint32_t source_size, char * dest, uint32_t dest_size) const;
int32_t doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const;
int32_t doDecompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size);
int32_t doDecompressDataSynchronous(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const;
int32_t doDecompressDataAsynchronous(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size);
/// Flush result for previous asynchronous decompression requests.Must be used following with several calls of doDecompressDataReq.
void flushAsynchronousDecompressRequests();
@ -88,6 +89,7 @@ private:
std::map<uint32_t, qpl_job *> decomp_async_job_map;
Poco::Logger * log;
};
class CompressionCodecDeflateQpl : public ICompressionCodec
{
public: