diff --git a/contrib/qpl b/contrib/qpl index d75a29d95d8..0bce2b03423 160000 --- a/contrib/qpl +++ b/contrib/qpl @@ -1 +1 @@ -Subproject commit d75a29d95d8a548297fce3549d21020005364dc8 +Subproject commit 0bce2b03423f6fbeb8bce66cc8be0bf558058848 diff --git a/contrib/qpl-cmake/CMakeLists.txt b/contrib/qpl-cmake/CMakeLists.txt index fc5548b0652..334731d105f 100644 --- a/contrib/qpl-cmake/CMakeLists.txt +++ b/contrib/qpl-cmake/CMakeLists.txt @@ -40,9 +40,10 @@ set (LOG_HW_INIT OFF) set (SANITIZE_MEMORY OFF) set (SANITIZE_THREADS OFF) set (LIB_FUZZING_ENGINE OFF) +set (DYNAMIC_LOADING_LIBACCEL_CONFIG OFF) function(GetLibraryVersion _content _outputVar) - string(REGEX MATCHALL "Qpl VERSION (.+) LANGUAGES" VERSION_REGEX "${_content}") + string(REGEX MATCHALL "QPL VERSION (.+) LANGUAGES" VERSION_REGEX "${_content}") SET(${_outputVar} ${CMAKE_MATCH_1} PARENT_SCOPE) endfunction() @@ -240,7 +241,9 @@ add_library(core_iaa OBJECT ${HW_PATH_SRC}) target_include_directories(core_iaa PRIVATE ${UUID_DIR} PUBLIC $ - PRIVATE $ + PUBLIC $ + PRIVATE $ # status.h in own_checkers.h + PRIVATE $ # own_checkers.h PRIVATE $) target_compile_options(core_iaa @@ -339,4 +342,7 @@ target_link_libraries(_qpl PRIVATE ${CMAKE_DL_LIBS}) add_library (ch_contrib::qpl ALIAS _qpl) -target_include_directories(_qpl SYSTEM BEFORE PUBLIC "${QPL_PROJECT_DIR}/include") +target_include_directories(_qpl SYSTEM BEFORE + PUBLIC "${QPL_PROJECT_DIR}/include" + PUBLIC "${LIBACCEL_SOURCE_DIR}/accfg" + PUBLIC ${UUID_DIR}) diff --git a/src/Compression/CompressionCodecDeflateQpl.cpp b/src/Compression/CompressionCodecDeflateQpl.cpp index 29d90b7dbd6..16c1d285dbd 100644 --- a/src/Compression/CompressionCodecDeflateQpl.cpp +++ b/src/Compression/CompressionCodecDeflateQpl.cpp @@ -7,6 +7,7 @@ #include #include #include +#include "libaccel_config.h" namespace DB { @@ -16,11 +17,6 @@ namespace ErrorCodes extern const int CANNOT_DECOMPRESS; } -std::array DeflateQplJobHWPool::hw_job_ptr_pool; -std::array DeflateQplJobHWPool::hw_job_ptr_locks; -bool DeflateQplJobHWPool::job_pool_ready = false; -std::unique_ptr DeflateQplJobHWPool::hw_jobs_buffer; - DeflateQplJobHWPool & DeflateQplJobHWPool::instance() { static DeflateQplJobHWPool pool; @@ -28,47 +24,65 @@ DeflateQplJobHWPool & DeflateQplJobHWPool::instance() } DeflateQplJobHWPool::DeflateQplJobHWPool() - : random_engine(std::random_device()()) - , distribution(0, MAX_HW_JOB_NUMBER - 1) + : hw_jobs_max_number(0) + , random_engine(std::random_device()()) { Poco::Logger * log = &Poco::Logger::get("DeflateQplJobHWPool"); - UInt32 job_size = 0; const char * qpl_version = qpl_get_library_version(); - /// Get size required for saving a single qpl job object - qpl_get_job_size(qpl_path_hardware, &job_size); - /// Allocate entire buffer for storing all job objects - hw_jobs_buffer = std::make_unique(job_size * MAX_HW_JOB_NUMBER); - /// Initialize pool for storing all job object pointers - /// Reallocate buffer by shifting address offset for each job object. - for (UInt32 index = 0; index < MAX_HW_JOB_NUMBER; ++index) + // loop all configured workqueue size to get maximum job number. + accfg_ctx *ctx_ptr = nullptr; + auto ctx_status = accfg_new(&ctx_ptr); + if (ctx_status == 0) { - qpl_job * qpl_job_ptr = reinterpret_cast(hw_jobs_buffer.get() + index * job_size); - if (auto status = qpl_init_job(qpl_path_hardware, qpl_job_ptr); status != QPL_STS_OK) + auto *dev_ptr = accfg_device_get_first(ctx_ptr); + while (nullptr != dev_ptr) { // loop all devices + for (auto *wq_ptr = accfg_wq_get_first(dev_ptr); \ + wq_ptr != nullptr; \ + wq_ptr = accfg_wq_get_next(wq_ptr)) + hw_jobs_max_number += accfg_wq_get_size(wq_ptr); + dev_ptr = accfg_device_get_next(dev_ptr); + } + } + + if(hw_jobs_max_number == 0) + { + job_pool_ready = false; + LOG_WARNING(log, "Initialization of hardware-assisted DeflateQpl codec failed, falling back to software DeflateQpl codec. Please check if Intel In-Memory Analytics Accelerator (IAA) is properly set up. accfg_context_status: {} ,total_wq_size: {} , QPL Version: {}.", ctx_status, hw_jobs_max_number, qpl_version); + return; + } + distribution = std::uniform_int_distribution(0, hw_jobs_max_number - 1); + /// Get size required for saving a single qpl job object + qpl_get_job_size(qpl_path_hardware, &per_job_size); + /// Allocate job buffer pool for storing all job objects + hw_jobs_buffer = std::make_unique(per_job_size * hw_jobs_max_number); + hw_job_ptr_locks = std::make_unique(hw_jobs_max_number); + /// Initialize all job objects in job buffer pool + for (UInt32 index = 0; index < hw_jobs_max_number; ++index) + { + qpl_job * job_ptr = reinterpret_cast(hw_jobs_buffer.get() + index * per_job_size); + if (auto status = qpl_init_job(qpl_path_hardware, job_ptr); status != QPL_STS_OK) { job_pool_ready = false; - LOG_WARNING(log, "Initialization of hardware-assisted DeflateQpl codec failed: {} , falling back to software DeflateQpl codec. Please check if Intel In-Memory Analytics Accelerator (IAA) is properly set up. QPL Version: {}.", static_cast(status), qpl_version); + LOG_WARNING(log, "Initialization of hardware-assisted DeflateQpl codec failed, falling back to software DeflateQpl codec. Please check if Intel In-Memory Analytics Accelerator (IAA) is properly set up. Status code: {}, QPL Version: {}.", static_cast(status), qpl_version); return; } - hw_job_ptr_pool[index] = qpl_job_ptr; unLockJob(index); } job_pool_ready = true; - LOG_DEBUG(log, "Hardware-assisted DeflateQpl codec is ready! QPL Version: {}",qpl_version); + LOG_DEBUG(log, "Hardware-assisted DeflateQpl codec is ready! QPL Version: {}, hw_jobs_max_number: {}",qpl_version, hw_jobs_max_number); } DeflateQplJobHWPool::~DeflateQplJobHWPool() { - for (UInt32 i = 0; i < MAX_HW_JOB_NUMBER; ++i) + qpl_job * job_ptr = nullptr; + for (UInt32 index = 0; index < hw_jobs_max_number; ++index) { - if (hw_job_ptr_pool[i]) - { - while (!tryLockJob(i)); - qpl_fini_job(hw_job_ptr_pool[i]); - unLockJob(i); - hw_job_ptr_pool[i] = nullptr; - } + job_ptr = reinterpret_cast(hw_jobs_buffer.get() + index * per_job_size); + while (!tryLockJob(index)); + qpl_fini_job(job_ptr); + unLockJob(index); } job_pool_ready = false; } @@ -83,14 +97,14 @@ qpl_job * DeflateQplJobHWPool::acquireJob(UInt32 & job_id) { index = distribution(random_engine); retry++; - if (retry > MAX_HW_JOB_NUMBER) + if (retry > hw_jobs_max_number) { return nullptr; } } - job_id = MAX_HW_JOB_NUMBER - index; - assert(index < MAX_HW_JOB_NUMBER); - return hw_job_ptr_pool[index]; + job_id = hw_jobs_max_number - index; + assert(index < hw_jobs_max_number); + return reinterpret_cast(hw_jobs_buffer.get() + index * per_job_size); } else return nullptr; @@ -99,19 +113,19 @@ qpl_job * DeflateQplJobHWPool::acquireJob(UInt32 & job_id) void DeflateQplJobHWPool::releaseJob(UInt32 job_id) { if (isJobPoolReady()) - unLockJob(MAX_HW_JOB_NUMBER - job_id); + unLockJob(hw_jobs_max_number - job_id); } bool DeflateQplJobHWPool::tryLockJob(UInt32 index) { bool expected = false; - assert(index < MAX_HW_JOB_NUMBER); + assert(index < hw_jobs_max_number); return hw_job_ptr_locks[index].compare_exchange_strong(expected, true); } void DeflateQplJobHWPool::unLockJob(UInt32 index) { - assert(index < MAX_HW_JOB_NUMBER); + assert(index < hw_jobs_max_number); hw_job_ptr_locks[index].store(false); } diff --git a/src/Compression/CompressionCodecDeflateQpl.h b/src/Compression/CompressionCodecDeflateQpl.h index 3171a898311..866b88aa855 100644 --- a/src/Compression/CompressionCodecDeflateQpl.h +++ b/src/Compression/CompressionCodecDeflateQpl.h @@ -24,22 +24,23 @@ public: static DeflateQplJobHWPool & instance(); qpl_job * acquireJob(UInt32 & job_id); - static void releaseJob(UInt32 job_id); - static const bool & isJobPoolReady() { return job_pool_ready; } + void releaseJob(UInt32 job_id); + const bool & isJobPoolReady() { return job_pool_ready; } private: - static bool tryLockJob(UInt32 index); - static void unLockJob(UInt32 index); + bool tryLockJob(UInt32 index); + void unLockJob(UInt32 index); + /// size of each job objects + UInt32 per_job_size; /// Maximum jobs running in parallel supported by IAA hardware - static constexpr auto MAX_HW_JOB_NUMBER = 1024; + UInt32 hw_jobs_max_number; /// Entire buffer for storing all job objects - static std::unique_ptr hw_jobs_buffer; - /// Job pool for storing all job object pointers - static std::array hw_job_ptr_pool; + std::unique_ptr hw_jobs_buffer; /// Locks for accessing each job object pointers - static std::array hw_job_ptr_locks; - static bool job_pool_ready; + std::unique_ptr hw_job_ptr_locks; + + bool job_pool_ready; std::mt19937 random_engine; std::uniform_int_distribution distribution; };