mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-14 19:45:11 +00:00
revise several cosmetic issues
This commit is contained in:
parent
d4727288f8
commit
4e02ef4650
@ -24,65 +24,69 @@ DeflateQplJobHWPool & DeflateQplJobHWPool::instance()
|
||||
}
|
||||
|
||||
DeflateQplJobHWPool::DeflateQplJobHWPool()
|
||||
: hw_jobs_max_number(0)
|
||||
: max_hw_jobs(0)
|
||||
, random_engine(std::random_device()())
|
||||
{
|
||||
Poco::Logger * log = &Poco::Logger::get("DeflateQplJobHWPool");
|
||||
const char * qpl_version = qpl_get_library_version();
|
||||
|
||||
// loop all configured workqueue size to get maximum job number.
|
||||
accfg_ctx *ctx_ptr = nullptr;
|
||||
accfg_ctx * ctx_ptr = nullptr;
|
||||
auto ctx_status = accfg_new(&ctx_ptr);
|
||||
if (ctx_status == 0)
|
||||
{
|
||||
auto *dev_ptr = accfg_device_get_first(ctx_ptr);
|
||||
while (nullptr != dev_ptr) { // loop all devices
|
||||
for (auto *wq_ptr = accfg_wq_get_first(dev_ptr); \
|
||||
wq_ptr != nullptr; \
|
||||
wq_ptr = accfg_wq_get_next(wq_ptr))
|
||||
hw_jobs_max_number += accfg_wq_get_size(wq_ptr);
|
||||
auto * dev_ptr = accfg_device_get_first(ctx_ptr);
|
||||
while (dev_ptr != nullptr)
|
||||
{
|
||||
for (auto * wq_ptr = accfg_wq_get_first(dev_ptr); wq_ptr != nullptr; wq_ptr = accfg_wq_get_next(wq_ptr))
|
||||
max_hw_jobs += accfg_wq_get_size(wq_ptr);
|
||||
dev_ptr = accfg_device_get_next(dev_ptr);
|
||||
}
|
||||
}
|
||||
|
||||
if (hw_jobs_max_number == 0)
|
||||
else
|
||||
{
|
||||
job_pool_ready = false;
|
||||
LOG_WARNING(log, "Initialization of hardware-assisted DeflateQpl codec failed, falling back to software DeflateQpl codec. Please check if Intel In-Memory Analytics Accelerator (IAA) is properly set up. accfg_context_status: {} ,total_wq_size: {} , QPL Version: {}.", ctx_status, hw_jobs_max_number, qpl_version);
|
||||
LOG_WARNING(log, "Initialization of hardware-assisted DeflateQpl codec failed, falling back to software DeflateQpl codec. Failed to create new libaccel_config context -> status: {}, QPL Version: {}.", ctx_status, qpl_version);
|
||||
return;
|
||||
}
|
||||
distribution = std::uniform_int_distribution<int>(0, hw_jobs_max_number - 1);
|
||||
|
||||
if (max_hw_jobs == 0)
|
||||
{
|
||||
job_pool_ready = false;
|
||||
LOG_WARNING(log, "Initialization of hardware-assisted DeflateQpl codec failed, falling back to software DeflateQpl codec. Failed to get available workqueue size -> total_wq_size: {}, QPL Version: {}.", max_hw_jobs, qpl_version);
|
||||
return;
|
||||
}
|
||||
distribution = std::uniform_int_distribution<int>(0, max_hw_jobs - 1);
|
||||
/// Get size required for saving a single qpl job object
|
||||
qpl_get_job_size(qpl_path_hardware, &per_job_size);
|
||||
/// Allocate job buffer pool for storing all job objects
|
||||
hw_jobs_buffer = std::make_unique<uint8_t[]>(per_job_size * hw_jobs_max_number);
|
||||
hw_job_ptr_locks = std::make_unique<std::atomic_bool[]>(hw_jobs_max_number);
|
||||
hw_jobs_buffer = std::make_unique<uint8_t[]>(per_job_size * max_hw_jobs);
|
||||
hw_job_ptr_locks = std::make_unique<std::atomic_bool[]>(max_hw_jobs);
|
||||
/// Initialize all job objects in job buffer pool
|
||||
for (UInt32 index = 0; index < hw_jobs_max_number; ++index)
|
||||
for (UInt32 index = 0; index < max_hw_jobs; ++index)
|
||||
{
|
||||
qpl_job * job_ptr = reinterpret_cast<qpl_job *>(hw_jobs_buffer.get() + index * per_job_size);
|
||||
if (auto status = qpl_init_job(qpl_path_hardware, job_ptr); status != QPL_STS_OK)
|
||||
{
|
||||
job_pool_ready = false;
|
||||
LOG_WARNING(log, "Initialization of hardware-assisted DeflateQpl codec failed, falling back to software DeflateQpl codec. Please check if Intel In-Memory Analytics Accelerator (IAA) is properly set up. Status code: {}, QPL Version: {}.", static_cast<UInt32>(status), qpl_version);
|
||||
LOG_WARNING(log, "Initialization of hardware-assisted DeflateQpl codec failed, falling back to software DeflateQpl codec. Failed to Initialize qpl job -> status: {}, QPL Version: {}.", static_cast<UInt32>(status), qpl_version);
|
||||
return;
|
||||
}
|
||||
unLockJob(index);
|
||||
}
|
||||
|
||||
job_pool_ready = true;
|
||||
LOG_DEBUG(log, "Hardware-assisted DeflateQpl codec is ready! QPL Version: {}, hw_jobs_max_number: {}",qpl_version, hw_jobs_max_number);
|
||||
LOG_DEBUG(log, "Hardware-assisted DeflateQpl codec is ready! QPL Version: {}, max_hw_jobs: {}",qpl_version, max_hw_jobs);
|
||||
}
|
||||
|
||||
DeflateQplJobHWPool::~DeflateQplJobHWPool()
|
||||
{
|
||||
qpl_job * job_ptr = nullptr;
|
||||
for (UInt32 index = 0; index < hw_jobs_max_number; ++index)
|
||||
for (UInt32 i = 0; i < max_hw_jobs; ++i)
|
||||
{
|
||||
job_ptr = reinterpret_cast<qpl_job *>(hw_jobs_buffer.get() + index * per_job_size);
|
||||
while (!tryLockJob(index));
|
||||
qpl_job * job_ptr = reinterpret_cast<qpl_job *>(hw_jobs_buffer.get() + i * per_job_size);
|
||||
while (!tryLockJob(i));
|
||||
qpl_fini_job(job_ptr);
|
||||
unLockJob(index);
|
||||
unLockJob(i);
|
||||
}
|
||||
job_pool_ready = false;
|
||||
}
|
||||
@ -97,13 +101,13 @@ qpl_job * DeflateQplJobHWPool::acquireJob(UInt32 & job_id)
|
||||
{
|
||||
index = distribution(random_engine);
|
||||
retry++;
|
||||
if (retry > hw_jobs_max_number)
|
||||
if (retry > max_hw_jobs)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
job_id = hw_jobs_max_number - index;
|
||||
assert(index < hw_jobs_max_number);
|
||||
job_id = max_hw_jobs - index;
|
||||
assert(index < max_hw_jobs);
|
||||
return reinterpret_cast<qpl_job *>(hw_jobs_buffer.get() + index * per_job_size);
|
||||
}
|
||||
else
|
||||
@ -113,19 +117,19 @@ qpl_job * DeflateQplJobHWPool::acquireJob(UInt32 & job_id)
|
||||
void DeflateQplJobHWPool::releaseJob(UInt32 job_id)
|
||||
{
|
||||
if (isJobPoolReady())
|
||||
unLockJob(hw_jobs_max_number - job_id);
|
||||
unLockJob(max_hw_jobs - job_id);
|
||||
}
|
||||
|
||||
bool DeflateQplJobHWPool::tryLockJob(UInt32 index)
|
||||
{
|
||||
bool expected = false;
|
||||
assert(index < hw_jobs_max_number);
|
||||
assert(index < max_hw_jobs);
|
||||
return hw_job_ptr_locks[index].compare_exchange_strong(expected, true);
|
||||
}
|
||||
|
||||
void DeflateQplJobHWPool::unLockJob(UInt32 index)
|
||||
{
|
||||
assert(index < hw_jobs_max_number);
|
||||
assert(index < max_hw_jobs);
|
||||
hw_job_ptr_locks[index].store(false);
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,7 @@ private:
|
||||
/// size of each job objects
|
||||
UInt32 per_job_size;
|
||||
/// Maximum jobs running in parallel supported by IAA hardware
|
||||
UInt32 hw_jobs_max_number;
|
||||
UInt32 max_hw_jobs;
|
||||
/// Entire buffer for storing all job objects
|
||||
std::unique_ptr<uint8_t[]> hw_jobs_buffer;
|
||||
/// Locks for accessing each job object pointers
|
||||
|
Loading…
Reference in New Issue
Block a user