mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Upgrade IAA-Deflate codec library:qpl from v1.0.0 to v1.1.0
This commit is contained in:
parent
1cbdba0378
commit
260eb5450b
2
contrib/qpl
vendored
2
contrib/qpl
vendored
@ -1 +1 @@
|
|||||||
Subproject commit d75a29d95d8a548297fce3549d21020005364dc8
|
Subproject commit 0bce2b03423f6fbeb8bce66cc8be0bf558058848
|
@ -40,9 +40,10 @@ set (LOG_HW_INIT OFF)
|
|||||||
set (SANITIZE_MEMORY OFF)
|
set (SANITIZE_MEMORY OFF)
|
||||||
set (SANITIZE_THREADS OFF)
|
set (SANITIZE_THREADS OFF)
|
||||||
set (LIB_FUZZING_ENGINE OFF)
|
set (LIB_FUZZING_ENGINE OFF)
|
||||||
|
set (DYNAMIC_LOADING_LIBACCEL_CONFIG OFF)
|
||||||
|
|
||||||
function(GetLibraryVersion _content _outputVar)
|
function(GetLibraryVersion _content _outputVar)
|
||||||
string(REGEX MATCHALL "Qpl VERSION (.+) LANGUAGES" VERSION_REGEX "${_content}")
|
string(REGEX MATCHALL "QPL VERSION (.+) LANGUAGES" VERSION_REGEX "${_content}")
|
||||||
SET(${_outputVar} ${CMAKE_MATCH_1} PARENT_SCOPE)
|
SET(${_outputVar} ${CMAKE_MATCH_1} PARENT_SCOPE)
|
||||||
endfunction()
|
endfunction()
|
||||||
|
|
||||||
@ -240,7 +241,9 @@ add_library(core_iaa OBJECT ${HW_PATH_SRC})
|
|||||||
target_include_directories(core_iaa
|
target_include_directories(core_iaa
|
||||||
PRIVATE ${UUID_DIR}
|
PRIVATE ${UUID_DIR}
|
||||||
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-iaa/include>
|
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-iaa/include>
|
||||||
PRIVATE $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-iaa/sources/include>
|
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-iaa/sources/include>
|
||||||
|
PRIVATE $<BUILD_INTERFACE:${QPL_PROJECT_DIR}/include> # status.h in own_checkers.h
|
||||||
|
PRIVATE $<BUILD_INTERFACE:${QPL_PROJECT_DIR}/sources/c_api> # own_checkers.h
|
||||||
PRIVATE $<TARGET_PROPERTY:qplcore_avx512,INTERFACE_INCLUDE_DIRECTORIES>)
|
PRIVATE $<TARGET_PROPERTY:qplcore_avx512,INTERFACE_INCLUDE_DIRECTORIES>)
|
||||||
|
|
||||||
target_compile_options(core_iaa
|
target_compile_options(core_iaa
|
||||||
@ -339,4 +342,7 @@ target_link_libraries(_qpl
|
|||||||
PRIVATE ${CMAKE_DL_LIBS})
|
PRIVATE ${CMAKE_DL_LIBS})
|
||||||
|
|
||||||
add_library (ch_contrib::qpl ALIAS _qpl)
|
add_library (ch_contrib::qpl ALIAS _qpl)
|
||||||
target_include_directories(_qpl SYSTEM BEFORE PUBLIC "${QPL_PROJECT_DIR}/include")
|
target_include_directories(_qpl SYSTEM BEFORE
|
||||||
|
PUBLIC "${QPL_PROJECT_DIR}/include"
|
||||||
|
PUBLIC "${LIBACCEL_SOURCE_DIR}/accfg"
|
||||||
|
PUBLIC ${UUID_DIR})
|
||||||
|
@ -7,6 +7,7 @@
|
|||||||
#include <Parsers/ASTIdentifier.h>
|
#include <Parsers/ASTIdentifier.h>
|
||||||
#include <Poco/Logger.h>
|
#include <Poco/Logger.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
|
#include "libaccel_config.h"
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -16,11 +17,6 @@ namespace ErrorCodes
|
|||||||
extern const int CANNOT_DECOMPRESS;
|
extern const int CANNOT_DECOMPRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::array<qpl_job *, DeflateQplJobHWPool::MAX_HW_JOB_NUMBER> DeflateQplJobHWPool::hw_job_ptr_pool;
|
|
||||||
std::array<std::atomic_bool, DeflateQplJobHWPool::MAX_HW_JOB_NUMBER> DeflateQplJobHWPool::hw_job_ptr_locks;
|
|
||||||
bool DeflateQplJobHWPool::job_pool_ready = false;
|
|
||||||
std::unique_ptr<uint8_t[]> DeflateQplJobHWPool::hw_jobs_buffer;
|
|
||||||
|
|
||||||
DeflateQplJobHWPool & DeflateQplJobHWPool::instance()
|
DeflateQplJobHWPool & DeflateQplJobHWPool::instance()
|
||||||
{
|
{
|
||||||
static DeflateQplJobHWPool pool;
|
static DeflateQplJobHWPool pool;
|
||||||
@ -28,47 +24,65 @@ DeflateQplJobHWPool & DeflateQplJobHWPool::instance()
|
|||||||
}
|
}
|
||||||
|
|
||||||
DeflateQplJobHWPool::DeflateQplJobHWPool()
|
DeflateQplJobHWPool::DeflateQplJobHWPool()
|
||||||
: random_engine(std::random_device()())
|
: hw_jobs_max_number(0)
|
||||||
, distribution(0, MAX_HW_JOB_NUMBER - 1)
|
, random_engine(std::random_device()())
|
||||||
{
|
{
|
||||||
Poco::Logger * log = &Poco::Logger::get("DeflateQplJobHWPool");
|
Poco::Logger * log = &Poco::Logger::get("DeflateQplJobHWPool");
|
||||||
UInt32 job_size = 0;
|
|
||||||
const char * qpl_version = qpl_get_library_version();
|
const char * qpl_version = qpl_get_library_version();
|
||||||
|
|
||||||
/// Get size required for saving a single qpl job object
|
// loop all configured workqueue size to get maximum job number.
|
||||||
qpl_get_job_size(qpl_path_hardware, &job_size);
|
accfg_ctx *ctx_ptr = nullptr;
|
||||||
/// Allocate entire buffer for storing all job objects
|
auto ctx_status = accfg_new(&ctx_ptr);
|
||||||
hw_jobs_buffer = std::make_unique<uint8_t[]>(job_size * MAX_HW_JOB_NUMBER);
|
if (ctx_status == 0)
|
||||||
/// Initialize pool for storing all job object pointers
|
|
||||||
/// Reallocate buffer by shifting address offset for each job object.
|
|
||||||
for (UInt32 index = 0; index < MAX_HW_JOB_NUMBER; ++index)
|
|
||||||
{
|
{
|
||||||
qpl_job * qpl_job_ptr = reinterpret_cast<qpl_job *>(hw_jobs_buffer.get() + index * job_size);
|
auto *dev_ptr = accfg_device_get_first(ctx_ptr);
|
||||||
if (auto status = qpl_init_job(qpl_path_hardware, qpl_job_ptr); status != QPL_STS_OK)
|
while (nullptr != dev_ptr) { // loop all devices
|
||||||
|
for (auto *wq_ptr = accfg_wq_get_first(dev_ptr); \
|
||||||
|
wq_ptr != nullptr; \
|
||||||
|
wq_ptr = accfg_wq_get_next(wq_ptr))
|
||||||
|
hw_jobs_max_number += accfg_wq_get_size(wq_ptr);
|
||||||
|
dev_ptr = accfg_device_get_next(dev_ptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(hw_jobs_max_number == 0)
|
||||||
|
{
|
||||||
|
job_pool_ready = false;
|
||||||
|
LOG_WARNING(log, "Initialization of hardware-assisted DeflateQpl codec failed, falling back to software DeflateQpl codec. Please check if Intel In-Memory Analytics Accelerator (IAA) is properly set up. accfg_context_status: {} ,total_wq_size: {} , QPL Version: {}.", ctx_status, hw_jobs_max_number, qpl_version);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
distribution = std::uniform_int_distribution<int>(0, hw_jobs_max_number - 1);
|
||||||
|
/// Get size required for saving a single qpl job object
|
||||||
|
qpl_get_job_size(qpl_path_hardware, &per_job_size);
|
||||||
|
/// Allocate job buffer pool for storing all job objects
|
||||||
|
hw_jobs_buffer = std::make_unique<uint8_t[]>(per_job_size * hw_jobs_max_number);
|
||||||
|
hw_job_ptr_locks = std::make_unique<std::atomic_bool[]>(hw_jobs_max_number);
|
||||||
|
/// Initialize all job objects in job buffer pool
|
||||||
|
for (UInt32 index = 0; index < hw_jobs_max_number; ++index)
|
||||||
|
{
|
||||||
|
qpl_job * job_ptr = reinterpret_cast<qpl_job *>(hw_jobs_buffer.get() + index * per_job_size);
|
||||||
|
if (auto status = qpl_init_job(qpl_path_hardware, job_ptr); status != QPL_STS_OK)
|
||||||
{
|
{
|
||||||
job_pool_ready = false;
|
job_pool_ready = false;
|
||||||
LOG_WARNING(log, "Initialization of hardware-assisted DeflateQpl codec failed: {} , falling back to software DeflateQpl codec. Please check if Intel In-Memory Analytics Accelerator (IAA) is properly set up. QPL Version: {}.", static_cast<UInt32>(status), qpl_version);
|
LOG_WARNING(log, "Initialization of hardware-assisted DeflateQpl codec failed, falling back to software DeflateQpl codec. Please check if Intel In-Memory Analytics Accelerator (IAA) is properly set up. Status code: {}, QPL Version: {}.", static_cast<UInt32>(status), qpl_version);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
hw_job_ptr_pool[index] = qpl_job_ptr;
|
|
||||||
unLockJob(index);
|
unLockJob(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
job_pool_ready = true;
|
job_pool_ready = true;
|
||||||
LOG_DEBUG(log, "Hardware-assisted DeflateQpl codec is ready! QPL Version: {}",qpl_version);
|
LOG_DEBUG(log, "Hardware-assisted DeflateQpl codec is ready! QPL Version: {}, hw_jobs_max_number: {}",qpl_version, hw_jobs_max_number);
|
||||||
}
|
}
|
||||||
|
|
||||||
DeflateQplJobHWPool::~DeflateQplJobHWPool()
|
DeflateQplJobHWPool::~DeflateQplJobHWPool()
|
||||||
{
|
{
|
||||||
for (UInt32 i = 0; i < MAX_HW_JOB_NUMBER; ++i)
|
qpl_job * job_ptr = nullptr;
|
||||||
|
for (UInt32 index = 0; index < hw_jobs_max_number; ++index)
|
||||||
{
|
{
|
||||||
if (hw_job_ptr_pool[i])
|
job_ptr = reinterpret_cast<qpl_job *>(hw_jobs_buffer.get() + index * per_job_size);
|
||||||
{
|
while (!tryLockJob(index));
|
||||||
while (!tryLockJob(i));
|
qpl_fini_job(job_ptr);
|
||||||
qpl_fini_job(hw_job_ptr_pool[i]);
|
unLockJob(index);
|
||||||
unLockJob(i);
|
|
||||||
hw_job_ptr_pool[i] = nullptr;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
job_pool_ready = false;
|
job_pool_ready = false;
|
||||||
}
|
}
|
||||||
@ -83,14 +97,14 @@ qpl_job * DeflateQplJobHWPool::acquireJob(UInt32 & job_id)
|
|||||||
{
|
{
|
||||||
index = distribution(random_engine);
|
index = distribution(random_engine);
|
||||||
retry++;
|
retry++;
|
||||||
if (retry > MAX_HW_JOB_NUMBER)
|
if (retry > hw_jobs_max_number)
|
||||||
{
|
{
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
job_id = MAX_HW_JOB_NUMBER - index;
|
job_id = hw_jobs_max_number - index;
|
||||||
assert(index < MAX_HW_JOB_NUMBER);
|
assert(index < hw_jobs_max_number);
|
||||||
return hw_job_ptr_pool[index];
|
return reinterpret_cast<qpl_job *>(hw_jobs_buffer.get() + index * per_job_size);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
return nullptr;
|
return nullptr;
|
||||||
@ -99,19 +113,19 @@ qpl_job * DeflateQplJobHWPool::acquireJob(UInt32 & job_id)
|
|||||||
void DeflateQplJobHWPool::releaseJob(UInt32 job_id)
|
void DeflateQplJobHWPool::releaseJob(UInt32 job_id)
|
||||||
{
|
{
|
||||||
if (isJobPoolReady())
|
if (isJobPoolReady())
|
||||||
unLockJob(MAX_HW_JOB_NUMBER - job_id);
|
unLockJob(hw_jobs_max_number - job_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DeflateQplJobHWPool::tryLockJob(UInt32 index)
|
bool DeflateQplJobHWPool::tryLockJob(UInt32 index)
|
||||||
{
|
{
|
||||||
bool expected = false;
|
bool expected = false;
|
||||||
assert(index < MAX_HW_JOB_NUMBER);
|
assert(index < hw_jobs_max_number);
|
||||||
return hw_job_ptr_locks[index].compare_exchange_strong(expected, true);
|
return hw_job_ptr_locks[index].compare_exchange_strong(expected, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void DeflateQplJobHWPool::unLockJob(UInt32 index)
|
void DeflateQplJobHWPool::unLockJob(UInt32 index)
|
||||||
{
|
{
|
||||||
assert(index < MAX_HW_JOB_NUMBER);
|
assert(index < hw_jobs_max_number);
|
||||||
hw_job_ptr_locks[index].store(false);
|
hw_job_ptr_locks[index].store(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,22 +24,23 @@ public:
|
|||||||
static DeflateQplJobHWPool & instance();
|
static DeflateQplJobHWPool & instance();
|
||||||
|
|
||||||
qpl_job * acquireJob(UInt32 & job_id);
|
qpl_job * acquireJob(UInt32 & job_id);
|
||||||
static void releaseJob(UInt32 job_id);
|
void releaseJob(UInt32 job_id);
|
||||||
static const bool & isJobPoolReady() { return job_pool_ready; }
|
const bool & isJobPoolReady() { return job_pool_ready; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static bool tryLockJob(UInt32 index);
|
bool tryLockJob(UInt32 index);
|
||||||
static void unLockJob(UInt32 index);
|
void unLockJob(UInt32 index);
|
||||||
|
|
||||||
|
/// size of each job objects
|
||||||
|
UInt32 per_job_size;
|
||||||
/// Maximum jobs running in parallel supported by IAA hardware
|
/// Maximum jobs running in parallel supported by IAA hardware
|
||||||
static constexpr auto MAX_HW_JOB_NUMBER = 1024;
|
UInt32 hw_jobs_max_number;
|
||||||
/// Entire buffer for storing all job objects
|
/// Entire buffer for storing all job objects
|
||||||
static std::unique_ptr<uint8_t[]> hw_jobs_buffer;
|
std::unique_ptr<uint8_t[]> hw_jobs_buffer;
|
||||||
/// Job pool for storing all job object pointers
|
|
||||||
static std::array<qpl_job *, MAX_HW_JOB_NUMBER> hw_job_ptr_pool;
|
|
||||||
/// Locks for accessing each job object pointers
|
/// Locks for accessing each job object pointers
|
||||||
static std::array<std::atomic_bool, MAX_HW_JOB_NUMBER> hw_job_ptr_locks;
|
std::unique_ptr<std::atomic_bool[]> hw_job_ptr_locks;
|
||||||
static bool job_pool_ready;
|
|
||||||
|
bool job_pool_ready;
|
||||||
std::mt19937 random_engine;
|
std::mt19937 random_engine;
|
||||||
std::uniform_int_distribution<int> distribution;
|
std::uniform_int_distribution<int> distribution;
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user