Merge branch 'iaadeflate_upgrade_qpl_v1.3.0' of https://github.com/jinjunzh/ClickHouse into iaadeflate_upgrade_qpl_v1.3.0

This commit is contained in:
jinjunzh 2023-11-29 18:45:42 -05:00
commit 1402829c6a
2 changed files with 17 additions and 17 deletions

View File

@ -139,7 +139,6 @@ void DeflateQplJobHWPool::unLockJob(UInt32 index)
hw_job_ptr_locks[index].store(false);
}
//HardwareCodecDeflateQpl
HardwareCodecDeflateQpl::HardwareCodecDeflateQpl(SoftwareCodecDeflateQpl & sw_codec_)
: log(&Poco::Logger::get("HardwareCodecDeflateQpl"))
, sw_codec(sw_codec_)
@ -170,7 +169,7 @@ Int32 HardwareCodecDeflateQpl::doCompressData(const char * source, UInt32 source
UInt32 compressed_size = 0;
if (!(job_ptr = DeflateQplJobHWPool::instance().acquireJob(job_id)))
{
LOG_INFO(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doCompressData->acquireJob fail, probably job pool exhausted)");
LOG_INFO(log, "DeflateQpl HW codec failed, falling back to SW codec. (Details: doCompressData->acquireJob fail, probably job pool exhausted)");
return RET_ERROR;
}
@ -190,7 +189,7 @@ Int32 HardwareCodecDeflateQpl::doCompressData(const char * source, UInt32 source
}
else
{
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doCompressData->qpl_execute_job with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", static_cast<UInt32>(status));
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec. (Details: doCompressData->qpl_execute_job with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", static_cast<UInt32>(status));
DeflateQplJobHWPool::instance().releaseJob(job_id);
return RET_ERROR;
}
@ -203,7 +202,7 @@ Int32 HardwareCodecDeflateQpl::doDecompressDataSynchronous(const char * source,
UInt32 decompressed_size = 0;
if (!(job_ptr = DeflateQplJobHWPool::instance().acquireJob(job_id)))
{
LOG_INFO(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataSynchronous->acquireJob fail, probably job pool exhausted)");
LOG_INFO(log, "DeflateQpl HW codec failed, falling back to SW codec. (Details: doDecompressDataSynchronous->acquireJob fail, probably job pool exhausted)");
return RET_ERROR;
}
@ -219,22 +218,23 @@ Int32 HardwareCodecDeflateQpl::doDecompressDataSynchronous(const char * source,
if (status != QPL_STS_OK)
{
DeflateQplJobHWPool::instance().releaseJob(job_id);
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataSynchronous->qpl_submit_job with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", static_cast<UInt32>(status));
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec. (Details: doDecompressDataSynchronous->qpl_submit_job with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", static_cast<UInt32>(status));
return RET_ERROR;
}
/// Busy waiting till job complete.
UInt32 check_time = 0;
UInt32 num_checks = 0;
do
{
_tpause(1, __rdtsc() + 1000);
status = qpl_check_job(job_ptr);
++check_time;
} while (status == QPL_STS_BEING_PROCESSED && check_time < MAX_CHECKS);
++num_checks;
} while (status == QPL_STS_BEING_PROCESSED && num_checks < MAX_CHECKS);
if (status != QPL_STS_OK)
{
DeflateQplJobHWPool::instance().releaseJob(job_id);
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataSynchronous->qpl_submit_job with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", static_cast<UInt32>(status));
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec. (Details: doDecompressDataSynchronous->qpl_submit_job with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", static_cast<UInt32>(status));
return RET_ERROR;
}
@ -249,7 +249,7 @@ Int32 HardwareCodecDeflateQpl::doDecompressDataAsynchronous(const char * source,
qpl_job * job_ptr = nullptr;
if (!(job_ptr = DeflateQplJobHWPool::instance().acquireJob(job_id)))
{
LOG_INFO(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataAsynchronous->acquireJob fail, probably job pool exhausted)");
LOG_INFO(log, "DeflateQpl HW codec failed, falling back to SW codec. (Details: doDecompressDataAsynchronous->acquireJob fail, probably job pool exhausted)");
return RET_ERROR;
}
@ -269,7 +269,7 @@ Int32 HardwareCodecDeflateQpl::doDecompressDataAsynchronous(const char * source,
else
{
DeflateQplJobHWPool::instance().releaseJob(job_id);
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataAsynchronous->qpl_submit_job with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", static_cast<UInt32>(status));
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec. (Details: doDecompressDataAsynchronous->qpl_submit_job with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", static_cast<UInt32>(status));
return RET_ERROR;
}
}
@ -278,7 +278,7 @@ void HardwareCodecDeflateQpl::flushAsynchronousDecompressRequests()
{
auto n_jobs_processing = decomp_async_job_map.size();
std::map<UInt32, qpl_job *>::iterator it = decomp_async_job_map.begin();
UInt32 check_time = 0;
UInt32 num_checks = 0;
while (n_jobs_processing)
{
@ -288,7 +288,7 @@ void HardwareCodecDeflateQpl::flushAsynchronousDecompressRequests()
job_ptr = it->second;
auto status = qpl_check_job(job_ptr);
if ((status == QPL_STS_BEING_PROCESSED) && (check_time < MAX_CHECKS))
if ((status == QPL_STS_BEING_PROCESSED) && (num_checks < MAX_CHECKS))
{
it++;
}
@ -301,7 +301,7 @@ void HardwareCodecDeflateQpl::flushAsynchronousDecompressRequests()
job_ptr->available_in,
reinterpret_cast<char *>(job_ptr->next_out_ptr),
job_ptr->available_out);
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: flushAsynchronousDecompressRequests with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", static_cast<UInt32>(status));
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec. (Details: flushAsynchronousDecompressRequests with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", static_cast<UInt32>(status));
}
it = decomp_async_job_map.erase(it);
DeflateQplJobHWPool::instance().releaseJob(job_id);
@ -314,7 +314,7 @@ void HardwareCodecDeflateQpl::flushAsynchronousDecompressRequests()
{
it = decomp_async_job_map.begin();
_tpause(1, __rdtsc() + 1000);
++check_time;
++num_checks;
}
}
}

View File

@ -68,7 +68,7 @@ public:
/// Maximum times to check if hardware job complete, otherwise fallback to software codec.
static constexpr UInt32 MAX_CHECKS = UINT16_MAX;
HardwareCodecDeflateQpl(SoftwareCodecDeflateQpl& codec);
HardwareCodecDeflateQpl(SoftwareCodecDeflateQpl & codec);
~HardwareCodecDeflateQpl();
Int32 doCompressData(const char * source, UInt32 source_size, char * dest, UInt32 dest_size) const;
@ -89,7 +89,7 @@ private:
/// For flush, pop out job ID && job object from this map. Use job ID to release job lock and use job object to check job status till complete.
std::map<UInt32, qpl_job *> decomp_async_job_map;
Poco::Logger * log;
/// sw_codec provides a fallback in case of errors.
/// Provides a fallback in case of errors.
SoftwareCodecDeflateQpl & sw_codec;
};