Merge pull request #39494 from ClickHouse/iaa

Add Intel®-IAA/QPL-based DEFLATE_QPL Codec
This commit is contained in:
Robert Schulze 2022-07-22 13:40:04 +02:00 committed by GitHub
commit 05d6408f87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1074 additions and 34 deletions

3
.gitmodules vendored
View File

@ -259,6 +259,9 @@
[submodule "contrib/minizip-ng"]
path = contrib/minizip-ng
url = https://github.com/zlib-ng/minizip-ng
[submodule "contrib/qpl"]
path = contrib/qpl
url = https://github.com/intel/qpl.git
[submodule "contrib/wyhash"]
path = contrib/wyhash
url = https://github.com/wangyi-fudan/wyhash.git

View File

@ -158,6 +158,7 @@ add_contrib (sqlite-cmake sqlite-amalgamation)
add_contrib (s2geometry-cmake s2geometry)
add_contrib (base-x-cmake base-x)
add_contrib(c-ares-cmake c-ares)
add_contrib (qpl-cmake qpl)
# Put all targets defined here and in subdirectories under "contrib/<immediate-subdir>" folders in GUI-based IDEs.
# Some of third-party projects may override CMAKE_FOLDER or FOLDER property of their targets, so they would not appear

1
contrib/qpl vendored Submodule

@ -0,0 +1 @@
Subproject commit cdc8442f7a5e7a6ff6eea39c69665e0c5034d85d

View File

@ -0,0 +1,322 @@
## The Intel® QPL provides high performance implementations of data processing functions for existing hardware accelerator, and/or software path in case if hardware accelerator is not available.
if (OS_LINUX AND ARCH_AMD64 AND (ENABLE_AVX2 OR ENABLE_AVX512))
option (ENABLE_QPL "Enable Intel® Query Processing Library" ${ENABLE_LIBRARIES})
elseif(ENABLE_QPL)
message (${RECONFIGURE_MESSAGE_LEVEL} "QPL library is only supported on x86_64 arch with avx2/avx512 support")
endif()
if (NOT ENABLE_QPL)
message(STATUS "Not using QPL")
return()
endif()
set (QPL_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl")
set (QPL_SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl/sources")
set (QPL_BINARY_DIR "${ClickHouse_BINARY_DIR}/build/contrib/qpl")
set (UUID_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake")
set (EFFICIENT_WAIT ON)
set (BLOCK_ON_FAULT ON)
set (LOG_HW_INIT OFF)
set (SANITIZE_MEMORY OFF)
set (SANITIZE_THREADS OFF)
set (LIB_FUZZING_ENGINE OFF)
function(GetLibraryVersion _content _outputVar)
string(REGEX MATCHALL "Qpl VERSION (.+) LANGUAGES" VERSION_REGEX "${_content}")
SET(${_outputVar} ${CMAKE_MATCH_1} PARENT_SCOPE)
endfunction()
FILE(READ "${QPL_PROJECT_DIR}/CMakeLists.txt" HEADER_CONTENT)
GetLibraryVersion("${HEADER_CONTENT}" QPL_VERSION)
message(STATUS "Intel QPL version: ${QPL_VERSION}")
# There are 5 source subdirectories under $QPL_SRC_DIR: isal, c_api, core-sw, middle-layer, c_api.
# Generate 7 library targets: middle_layer_lib, isal, isal_asm, qplcore_px, qplcore_avx512, core_iaa, middle_layer_lib.
# Output ch_contrib::qpl by linking with 7 library targets.
include("${QPL_PROJECT_DIR}/cmake/CompileOptions.cmake")
# check nasm compiler
include(CheckLanguage)
check_language(ASM_NASM)
if(NOT CMAKE_ASM_NASM_COMPILER)
message(FATAL_ERROR "Please install NASM from 'https://www.nasm.us/' because NASM compiler can not be found!")
endif()
# [SUBDIR]isal
enable_language(ASM_NASM)
set(ISAL_C_SRC ${QPL_SRC_DIR}/isal/igzip/adler32_base.c
${QPL_SRC_DIR}/isal/igzip/huff_codes.c
${QPL_SRC_DIR}/isal/igzip/hufftables_c.c
${QPL_SRC_DIR}/isal/igzip/igzip.c
${QPL_SRC_DIR}/isal/igzip/igzip_base.c
${QPL_SRC_DIR}/isal/igzip/flatten_ll.c
${QPL_SRC_DIR}/isal/igzip/encode_df.c
${QPL_SRC_DIR}/isal/igzip/igzip_icf_base.c
${QPL_SRC_DIR}/isal/igzip/igzip_inflate.c
${QPL_SRC_DIR}/isal/igzip/igzip_icf_body.c
${QPL_SRC_DIR}/isal/crc/crc_base.c
${QPL_SRC_DIR}/isal/crc/crc64_base.c)
set(ISAL_ASM_SRC ${QPL_SRC_DIR}/isal/igzip/igzip_body.asm
${QPL_SRC_DIR}/isal/igzip/igzip_gen_icf_map_lh1_04.asm
${QPL_SRC_DIR}/isal/igzip/igzip_gen_icf_map_lh1_06.asm
${QPL_SRC_DIR}/isal/igzip/igzip_decode_block_stateless_04.asm
${QPL_SRC_DIR}/isal/igzip/igzip_finish.asm
${QPL_SRC_DIR}/isal/igzip/encode_df_04.asm
${QPL_SRC_DIR}/isal/igzip/encode_df_06.asm
${QPL_SRC_DIR}/isal/igzip/igzip_decode_block_stateless_01.asm
${QPL_SRC_DIR}/isal/igzip/proc_heap.asm
${QPL_SRC_DIR}/isal/igzip/igzip_icf_body_h1_gr_bt.asm
${QPL_SRC_DIR}/isal/igzip/igzip_icf_finish.asm
${QPL_SRC_DIR}/isal/igzip/igzip_inflate_multibinary.asm
${QPL_SRC_DIR}/isal/igzip/igzip_update_histogram_01.asm
${QPL_SRC_DIR}/isal/igzip/igzip_update_histogram_04.asm
${QPL_SRC_DIR}/isal/igzip/rfc1951_lookup.asm
${QPL_SRC_DIR}/isal/igzip/adler32_sse.asm
${QPL_SRC_DIR}/isal/igzip/adler32_avx2_4.asm
${QPL_SRC_DIR}/isal/igzip/igzip_deflate_hash.asm
${QPL_SRC_DIR}/isal/igzip/igzip_set_long_icf_fg_04.asm
${QPL_SRC_DIR}/isal/igzip/igzip_set_long_icf_fg_06.asm
${QPL_SRC_DIR}/isal/igzip/igzip_multibinary.asm
${QPL_SRC_DIR}/isal/igzip/stdmac.asm
${QPL_SRC_DIR}/isal/crc/crc_multibinary.asm
${QPL_SRC_DIR}/isal/crc/crc32_gzip_refl_by8.asm
${QPL_SRC_DIR}/isal/crc/crc32_gzip_refl_by8_02.asm
${QPL_SRC_DIR}/isal/crc/crc32_gzip_refl_by16_10.asm
${QPL_SRC_DIR}/isal/crc/crc32_ieee_01.asm
${QPL_SRC_DIR}/isal/crc/crc32_ieee_02.asm
${QPL_SRC_DIR}/isal/crc/crc32_ieee_by4.asm
${QPL_SRC_DIR}/isal/crc/crc32_ieee_by16_10.asm
${QPL_SRC_DIR}/isal/crc/crc32_iscsi_00.asm
${QPL_SRC_DIR}/isal/crc/crc32_iscsi_01.asm
${QPL_SRC_DIR}/isal/crc/crc32_iscsi_by16_10.asm)
# Adding ISA-L library target
add_library(isal OBJECT ${ISAL_C_SRC})
add_library(isal_asm OBJECT ${ISAL_ASM_SRC})
# Setting external and internal interfaces for ISA-L library
target_include_directories(isal
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/isal/include>
PRIVATE ${QPL_SRC_DIR}/isal/include
PUBLIC ${QPL_SRC_DIR}/isal/igzip)
target_compile_options(isal PRIVATE
"$<$<C_COMPILER_ID:GNU>:${QPL_LINUX_TOOLCHAIN_REQUIRED_FLAGS}>"
"$<$<CONFIG:Debug>:>"
"$<$<CONFIG:Release>:>")
target_compile_options(isal_asm PUBLIC "-I${QPL_SRC_DIR}/isal/include/"
PUBLIC "-I${QPL_SRC_DIR}/isal/igzip/"
PUBLIC "-I${QPL_SRC_DIR}/isal/crc/"
PUBLIC "-DQPL_LIB")
# AS_FEATURE_LEVEL=10 means "Check SIMD capabilities of the target system at runtime and use up to AVX512 if available".
# AS_FEATURE_LEVEL=5 means "Check SIMD capabilities of the target system at runtime and use up to AVX2 if available".
# HAVE_KNOWS_AVX512 means rely on AVX512 being available on the target system.
if (ENABLE_AVX512)
target_compile_options(isal_asm PUBLIC "-DHAVE_AS_KNOWS_AVX512" "-DAS_FEATURE_LEVEL=10")
else()
target_compile_options(isal_asm PUBLIC "-DAS_FEATURE_LEVEL=5")
endif()
# Here must remove "-fno-sanitize=undefined" from COMPILE_OPTIONS.
# Otherwise nasm compiler would fail to proceed due to unrecognition of "-fno-sanitize=undefined"
if (SANITIZE STREQUAL "undefined")
get_target_property(target_options isal_asm COMPILE_OPTIONS)
list(REMOVE_ITEM target_options "-fno-sanitize=undefined")
set_property(TARGET isal_asm PROPERTY COMPILE_OPTIONS ${target_options})
endif()
target_compile_definitions(isal PUBLIC
QPL_LIB
NDEBUG)
# [SUBDIR]core-sw
# Two libraries:qplcore_avx512/qplcore_px for SW fallback will be created which are implemented by AVX512 and non-AVX512 instructions respectively.
# The upper level QPL API will check SIMD capabilities of the target system at runtime and decide to call AVX512 function or non-AVX512 function.
# Hence, here we don't need put qplcore_avx512 under an ENABLE_AVX512 CMake switch.
# Actually, if we do that, some undefined symbols errors would happen because both of AVX512 function and non-AVX512 function are referenced by QPL API.
# PLATFORM=2 means AVX512 implementation; PLATFORM=0 means non-AVX512 implementation.
# Find Core Sources
file(GLOB SOURCES
${QPL_SRC_DIR}/core-sw/src/checksums/*.c
${QPL_SRC_DIR}/core-sw/src/filtering/*.c
${QPL_SRC_DIR}/core-sw/src/other/*.c
${QPL_SRC_DIR}/core-sw/src/compression/*.c)
file(GLOB DATA_SOURCES
${QPL_SRC_DIR}/core-sw/src/data/*.c)
# Create avx512 library
add_library(qplcore_avx512 OBJECT ${SOURCES})
target_compile_definitions(qplcore_avx512 PRIVATE PLATFORM=2)
target_include_directories(qplcore_avx512
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-sw/include>
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-sw/src/include>
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-sw/src/compression/include>
PRIVATE $<TARGET_PROPERTY:isal,INTERFACE_INCLUDE_DIRECTORIES>)
set_target_properties(qplcore_avx512 PROPERTIES
$<$<C_COMPILER_ID:GNU>:C_STANDARD 17>)
target_link_libraries(qplcore_avx512 ${CMAKE_DL_LIBS} isal)
target_compile_options(qplcore_avx512
PRIVATE ${QPL_LINUX_TOOLCHAIN_REQUIRED_FLAGS}
PRIVATE -march=skylake-avx512
PRIVATE "$<$<CONFIG:Debug>:>"
PRIVATE "$<$<CONFIG:Release>:-O3;-D_FORTIFY_SOURCE=2>")
target_compile_definitions(qplcore_avx512 PUBLIC QPL_BADARG_CHECK)
#
# Create px library
#
#set(CMAKE_INCLUDE_CURRENT_DIR ON)
# Create library
add_library(qplcore_px OBJECT ${SOURCES} ${DATA_SOURCES})
target_compile_definitions(qplcore_px PRIVATE PLATFORM=0)
target_include_directories(qplcore_px
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-sw/include>
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-sw/src/include>
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-sw/src/compression/include>
PRIVATE $<TARGET_PROPERTY:isal,INTERFACE_INCLUDE_DIRECTORIES>)
set_target_properties(qplcore_px PROPERTIES
$<$<C_COMPILER_ID:GNU>:C_STANDARD 17>)
target_link_libraries(qplcore_px isal ${CMAKE_DL_LIBS})
target_compile_options(qplcore_px
PRIVATE ${QPL_LINUX_TOOLCHAIN_REQUIRED_FLAGS}
PRIVATE "$<$<CONFIG:Debug>:>"
PRIVATE "$<$<CONFIG:Release>:-O3;-D_FORTIFY_SOURCE=2>")
target_compile_definitions(qplcore_px PUBLIC QPL_BADARG_CHECK)
# [SUBDIR]core-iaa
file(GLOB HW_PATH_SRC ${QPL_SRC_DIR}/core-iaa/sources/aecs/*.c
${QPL_SRC_DIR}/core-iaa/sources/aecs/*.cpp
${QPL_SRC_DIR}/core-iaa/sources/driver_loader/*.c
${QPL_SRC_DIR}/core-iaa/sources/driver_loader/*.cpp
${QPL_SRC_DIR}/core-iaa/sources/descriptors/*.c
${QPL_SRC_DIR}/core-iaa/sources/descriptors/*.cpp
${QPL_SRC_DIR}/core-iaa/sources/bit_rev.c)
# Create library
add_library(core_iaa OBJECT ${HW_PATH_SRC})
target_include_directories(core_iaa
PRIVATE ${UUID_DIR}
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-iaa/include>
PRIVATE $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-iaa/sources/include>
PRIVATE $<TARGET_PROPERTY:qplcore_avx512,INTERFACE_INCLUDE_DIRECTORIES>)
target_compile_options(core_iaa
PRIVATE $<$<C_COMPILER_ID:GNU>:${QPL_LINUX_TOOLCHAIN_REQUIRED_FLAGS};
$<$<CONFIG:Release>:-O3;-D_FORTIFY_SOURCE=2>>)
target_compile_features(core_iaa PRIVATE c_std_11)
target_compile_definitions(core_iaa PRIVATE QPL_BADARG_CHECK
PRIVATE $<$<BOOL:${BLOCK_ON_FAULT}>: BLOCK_ON_FAULT_ENABLED>
PRIVATE $<$<BOOL:${LOG_HW_INIT}>:LOG_HW_INIT>)
# [SUBDIR]middle-layer
generate_unpack_kernel_arrays(${QPL_BINARY_DIR})
file(GLOB MIDDLE_LAYER_SRC
${QPL_SRC_DIR}/middle-layer/analytics/*.cpp
${QPL_SRC_DIR}/middle-layer/c_wrapper/*.cpp
${QPL_SRC_DIR}/middle-layer/checksum/*.cpp
${QPL_SRC_DIR}/middle-layer/common/*.cpp
${QPL_SRC_DIR}/middle-layer/compression/*.cpp
${QPL_SRC_DIR}/middle-layer/compression/*/*.cpp
${QPL_SRC_DIR}/middle-layer/compression/*/*/*.cpp
${QPL_SRC_DIR}/middle-layer/dispatcher/*.cpp
${QPL_SRC_DIR}/middle-layer/other/*.cpp
${QPL_SRC_DIR}/middle-layer/util/*.cpp
${QPL_SRC_DIR}/middle-layer/inflate/*.cpp
${QPL_SRC_DIR}/core-iaa/sources/accelerator/*.cpp) # todo
file(GLOB GENERATED_PX_TABLES_SRC ${QPL_BINARY_DIR}/generated/px_*.cpp)
file(GLOB GENERATED_AVX512_TABLES_SRC ${QPL_BINARY_DIR}/generated/avx512_*.cpp)
add_library(middle_layer_lib OBJECT
${GENERATED_PX_TABLES_SRC}
${GENERATED_AVX512_TABLES_SRC}
${MIDDLE_LAYER_SRC})
target_compile_options(middle_layer_lib
PRIVATE $<$<C_COMPILER_ID:GNU>:${QPL_LINUX_TOOLCHAIN_REQUIRED_FLAGS};
${QPL_LINUX_TOOLCHAIN_DYNAMIC_LIBRARY_FLAGS};
$<$<CONFIG:Release>:-O3;-D_FORTIFY_SOURCE=2>>
PRIVATE $<$<COMPILE_LANG_AND_ID:CXX,GNU>:${QPL_LINUX_TOOLCHAIN_CPP_EMBEDDED_FLAGS}>)
target_compile_definitions(middle_layer_lib
PUBLIC QPL_VERSION="${QPL_VERSION}"
PUBLIC $<$<BOOL:${LOG_HW_INIT}>:LOG_HW_INIT>
PUBLIC $<$<BOOL:${EFFICIENT_WAIT}>:QPL_EFFICIENT_WAIT>
PUBLIC QPL_BADARG_CHECK)
set_source_files_properties(${GENERATED_PX_TABLES_SRC} PROPERTIES COMPILE_DEFINITIONS PLATFORM=0)
set_source_files_properties(${GENERATED_AVX512_TABLES_SRC} PROPERTIES COMPILE_DEFINITIONS PLATFORM=2)
target_include_directories(middle_layer_lib
PRIVATE ${UUID_DIR}
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/middle-layer>
PUBLIC $<TARGET_PROPERTY:_qpl,INTERFACE_INCLUDE_DIRECTORIES>
PUBLIC $<TARGET_PROPERTY:qplcore_px,INTERFACE_INCLUDE_DIRECTORIES>
PUBLIC $<TARGET_PROPERTY:qplcore_avx512,INTERFACE_INCLUDE_DIRECTORIES>
PUBLIC $<TARGET_PROPERTY:isal,INTERFACE_INCLUDE_DIRECTORIES>
PUBLIC $<TARGET_PROPERTY:core_iaa,INTERFACE_INCLUDE_DIRECTORIES>)
target_compile_definitions(middle_layer_lib PUBLIC -DQPL_LIB)
# [SUBDIR]c_api
file(GLOB_RECURSE QPL_C_API_SRC
${QPL_SRC_DIR}/c_api/*.c
${QPL_SRC_DIR}/c_api/*.cpp)
add_library(_qpl STATIC ${QPL_C_API_SRC}
$<TARGET_OBJECTS:middle_layer_lib>
$<TARGET_OBJECTS:isal>
$<TARGET_OBJECTS:isal_asm>
$<TARGET_OBJECTS:qplcore_px>
$<TARGET_OBJECTS:qplcore_avx512>
$<TARGET_OBJECTS:core_iaa>
$<TARGET_OBJECTS:middle_layer_lib>)
target_include_directories(_qpl
PUBLIC $<BUILD_INTERFACE:${QPL_PROJECT_DIR}/include/>
PRIVATE $<TARGET_PROPERTY:middle_layer_lib,INTERFACE_INCLUDE_DIRECTORIES>
PRIVATE $<BUILD_INTERFACE:${QPL_SRC_DIR}/c_api>)
target_compile_options(_qpl
PRIVATE $<$<C_COMPILER_ID:GNU>:${QPL_LINUX_TOOLCHAIN_REQUIRED_FLAGS};
${QPL_LINUX_TOOLCHAIN_DYNAMIC_LIBRARY_FLAGS};
$<$<CONFIG:Release>:-O3;-D_FORTIFY_SOURCE=2>>
PRIVATE $<$<COMPILE_LANG_AND_ID:CXX,GNU>:${QPL_LINUX_TOOLCHAIN_CPP_EMBEDDED_FLAGS}>)
target_compile_definitions(_qpl
PRIVATE -DQPL_LIB
PRIVATE -DQPL_BADARG_CHECK
PUBLIC -DENABLE_QPL_COMPRESSION)
target_link_libraries(_qpl
PRIVATE ${CMAKE_DL_LIBS})
add_library (ch_contrib::qpl ALIAS _qpl)
target_include_directories(_qpl SYSTEM BEFORE PUBLIC "${QPL_PROJECT_DIR}/include")

View File

@ -0,0 +1,4 @@
#ifndef _QPL_UUID_UUID_H
#define _QPL_UUID_UUID_H
typedef unsigned char uuid_t[16];
#endif /* _QPL_UUID_UUID_H */

View File

@ -51,6 +51,7 @@ RUN apt-get update \
rename \
software-properties-common \
tzdata \
nasm \
--yes --no-install-recommends \
&& apt-get clean

View File

@ -55,6 +55,7 @@ RUN apt-get update \
pkg-config \
tzdata \
pv \
nasm \
--yes --no-install-recommends
# Sanitizer options for services (clickhouse-server)

View File

@ -72,6 +72,7 @@ RUN apt-get update \
tzdata \
unixodbc \
file \
nasm \
--yes --no-install-recommends
RUN pip3 install numpy scipy pandas Jinja2

View File

@ -45,7 +45,7 @@ Configuration template:
- `min_part_size` The minimum size of a data part.
- `min_part_size_ratio` The ratio of the data part size to the table size.
- `method` Compression method. Acceptable values: `lz4`, `lz4hc`, `zstd`.
- `method` Compression method. Acceptable values: `lz4`, `lz4hc`, `zstd`,`deflate_qpl`.
- `level` Compression level. See [Codecs](../../sql-reference/statements/create/table.md#create-query-general-purpose-codecs).
You can configure multiple `<case>` sections.

View File

@ -248,6 +248,13 @@ ClickHouse supports general purpose codecs and specialized codecs.
High compression levels are useful for asymmetric scenarios, like compress once, decompress repeatedly. Higher levels mean better compression and higher CPU usage.
#### DEFLATE_QPL
`DEFLATE_QPL` — [Deflate compression algorithm](https://github.com/intel/qpl) implemented by Intel® Query Processing Library, which has dependency on Intel Hardware:
- DEFLATE_QPL is only supported on systems with AVX2/AVX512/IAA.
- DEFLATE_QPL-compressed data can only be transferred between nodes with AVX2/AVX512/IAA.
### Specialized Codecs
These codecs are designed to make compression more effective by using specific features of data. Some of these codecs do not compress data themself. Instead, they prepare the data for a common purpose codec, which compresses it better than without this preparation.

View File

@ -44,7 +44,7 @@ ClickHouse перезагружает встроенные словари с з
- `min_part_size` - Минимальный размер части таблицы.
- `min_part_size_ratio` - Отношение размера минимальной части таблицы к полному размеру таблицы.
- `method` - Метод сжатия. Возможные значения: `lz4`, `lz4hc`, `zstd`.
- `method` - Метод сжатия. Возможные значения: `lz4`, `lz4hc`, `zstd`,`deflate_qpl`.
- `level` Уровень сжатия. См. [Кодеки](../../sql-reference/statements/create/table/#create-query-common-purpose-codecs).
Можно сконфигурировать несколько разделов `<case>`.

View File

@ -79,6 +79,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
("block-size,b", po::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
("hc", "use LZ4HC instead of LZ4")
("zstd", "use ZSTD instead of LZ4")
("deflate_qpl", "use deflate_qpl instead of LZ4")
("codec", po::value<std::vector<std::string>>()->multitoken(), "use codecs combination instead of LZ4")
("level", po::value<int>(), "compression level for codecs specified via flags")
("none", "use no compression instead of LZ4")
@ -103,6 +104,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
bool decompress = options.count("decompress");
bool use_lz4hc = options.count("hc");
bool use_zstd = options.count("zstd");
bool use_deflate_qpl = options.count("deflate_qpl");
bool stat_mode = options.count("stat");
bool use_none = options.count("none");
unsigned block_size = options["block-size"].as<unsigned>();
@ -110,7 +112,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
if (options.count("codec"))
codecs = options["codec"].as<std::vector<std::string>>();
if ((use_lz4hc || use_zstd || use_none) && !codecs.empty())
if ((use_lz4hc || use_zstd || use_deflate_qpl || use_none) && !codecs.empty())
throw Exception("Wrong options, codec flags like --zstd and --codec options are mutually exclusive", ErrorCodes::BAD_ARGUMENTS);
if (!codecs.empty() && options.count("level"))
@ -122,6 +124,8 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
method_family = "LZ4HC";
else if (use_zstd)
method_family = "ZSTD";
else if (use_deflate_qpl)
method_family = "DEFLATE_QPL";
else if (use_none)
method_family = "NONE";

View File

@ -346,6 +346,12 @@ set_source_files_properties(
Columns/ColumnString.cpp
PROPERTIES COMPILE_FLAGS "${X86_INTRINSICS_FLAGS}")
if (ENABLE_QPL)
set_source_files_properties(
Compression/CompressionCodecDeflateQpl.cpp
PROPERTIES COMPILE_FLAGS "-mwaitpkg")
endif ()
target_link_libraries(clickhouse_common_io PUBLIC ch_contrib::re2_st)
target_link_libraries(clickhouse_common_io PUBLIC ch_contrib::re2)
@ -530,6 +536,10 @@ endif ()
target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::lz4)
if (TARGET ch_contrib::qpl)
dbms_target_link_libraries(PUBLIC ch_contrib::qpl)
endif ()
dbms_target_link_libraries(PRIVATE _boost_context)
if (ENABLE_NLP)

View File

@ -106,21 +106,15 @@ static void validateChecksum(char * data, size_t size, const Checksum expected_c
throw Exception(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
}
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
/// Returns number of compressed bytes read.
size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum, bool always_copy)
static void readHeaderAndGetCodecAndSize(
const char * compressed_buffer,
UInt8 header_size,
CompressionCodecPtr & codec,
size_t & size_decompressed,
size_t & size_compressed_without_checksum,
bool allow_different_codecs)
{
if (compressed_in->eof())
return 0;
UInt8 header_size = ICompressionCodec::getHeaderSize();
own_compressed_buffer.resize(header_size + sizeof(Checksum));
compressed_in->readStrict(own_compressed_buffer.data(), sizeof(Checksum) + header_size);
char * compressed_header = own_compressed_buffer.data() + sizeof(Checksum);
uint8_t method = ICompressionCodec::readMethod(compressed_header);
uint8_t method = ICompressionCodec::readMethod(compressed_buffer);
if (!codec)
{
@ -142,8 +136,8 @@ size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed,
}
}
size_compressed_without_checksum = ICompressionCodec::readCompressedBlockSize(compressed_header);
size_decompressed = ICompressionCodec::readDecompressedBlockSize(compressed_header);
size_compressed_without_checksum = ICompressionCodec::readCompressedBlockSize(compressed_buffer);
size_decompressed = ICompressionCodec::readDecompressedBlockSize(compressed_buffer);
/// This is for clang static analyzer.
assert(size_decompressed > 0);
@ -157,8 +151,27 @@ size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed,
if (size_compressed_without_checksum < header_size)
throw Exception("Can't decompress data: the compressed data size (" + toString(size_compressed_without_checksum)
+ ", this should include header size) is less than the header size (" + toString(header_size) + ")", ErrorCodes::CORRUPTED_DATA);
}
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum));
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
/// Returns number of compressed bytes read.
size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum, bool always_copy)
{
if (compressed_in->eof())
return 0;
UInt8 header_size = ICompressionCodec::getHeaderSize();
own_compressed_buffer.resize(header_size + sizeof(Checksum));
compressed_in->readStrict(own_compressed_buffer.data(), sizeof(Checksum) + header_size);
readHeaderAndGetCodecAndSize(
own_compressed_buffer.data() + sizeof(Checksum),
header_size,
codec,
size_decompressed,
size_compressed_without_checksum,
allow_different_codecs);
auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();
@ -184,9 +197,55 @@ size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed,
validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum);
}
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum));
return size_compressed_without_checksum + sizeof(Checksum);
}
/// Read compressed data into compressed_buffer for asynchronous decompression to avoid the situation of "read compressed block across the compressed_in".
size_t CompressedReadBufferBase::readCompressedDataBlockForAsynchronous(size_t & size_decompressed, size_t & size_compressed_without_checksum)
{
UInt8 header_size = ICompressionCodec::getHeaderSize();
/// Make sure the whole header located in 'compressed_in->' buffer.
if (compressed_in->eof() || (compressed_in->available() < (header_size + sizeof(Checksum))))
return 0;
own_compressed_buffer.resize(header_size + sizeof(Checksum));
compressed_in->readStrict(own_compressed_buffer.data(), sizeof(Checksum) + header_size);
readHeaderAndGetCodecAndSize(
own_compressed_buffer.data() + sizeof(Checksum),
header_size,
codec,
size_decompressed,
size_compressed_without_checksum,
allow_different_codecs);
auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();
/// Make sure the whole compressed block located in 'compressed_in->' buffer.
/// Otherwise, abandon header and restore original offset of compressed_in
if (compressed_in->offset() >= header_size + sizeof(Checksum) &&
compressed_in->available() >= (size_compressed_without_checksum - header_size) + additional_size_at_the_end_of_buffer + sizeof(Checksum))
{
compressed_in->position() -= header_size;
compressed_buffer = compressed_in->position();
compressed_in->position() += size_compressed_without_checksum;
if (!disable_checksum)
{
Checksum & checksum = *reinterpret_cast<Checksum *>(own_compressed_buffer.data());
validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum);
}
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum));
return size_compressed_without_checksum + sizeof(Checksum);
}
else
{
compressed_in->position() -= (sizeof(Checksum) + header_size);
return 0;
}
}
static void readHeaderAndGetCodec(const char * compressed_buffer, size_t size_decompressed, CompressionCodecPtr & codec, bool allow_different_codecs)
{
@ -216,14 +275,12 @@ static void readHeaderAndGetCodec(const char * compressed_buffer, size_t size_de
}
}
void CompressedReadBufferBase::decompressTo(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
{
readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs);
codec->decompress(compressed_buffer, size_compressed_without_checksum, to);
}
void CompressedReadBufferBase::decompress(BufferBase::Buffer & to, size_t size_decompressed, size_t size_compressed_without_checksum)
{
readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs);
@ -245,6 +302,17 @@ void CompressedReadBufferBase::decompress(BufferBase::Buffer & to, size_t size_d
codec->decompress(compressed_buffer, size_compressed_without_checksum, to.begin());
}
void CompressedReadBufferBase::flushAsynchronousDecompressRequests() const
{
if (codec)
codec->flushAsynchronousDecompressRequests();
}
void CompressedReadBufferBase::setDecompressMode(ICompressionCodec::CodecMode mode) const
{
if (codec)
codec->setDecompressMode(mode);
}
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in, bool allow_different_codecs_)
@ -253,7 +321,7 @@ CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in, bool allow_d
}
CompressedReadBufferBase::~CompressedReadBufferBase() = default; /// Proper destruction of unique_ptr of forward-declared type.
CompressedReadBufferBase::~CompressedReadBufferBase() = default; /// Proper destruction of unique_ptr of forward-declared type.
}

View File

@ -39,6 +39,17 @@ protected:
/// Returns number of compressed bytes read.
size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum, bool always_copy);
/// Read compressed data into compressed_buffer for asynchronous decompression to avoid the situation of "read compressed block across the compressed_in".
///
/// Compressed block may not be completely contained in "compressed_in" buffer which means compressed block may be read across the "compressed_in".
/// For native LZ4/ZSTD, it has no problem in facing situation above because they are synchronous.
/// But for asynchronous decompression, such as QPL deflate, it requires source and target buffer for decompression can not be overwritten until execution complete.
///
/// Returns number of compressed bytes read.
/// If Returns value > 0, means the address range for current block are maintained in "compressed_in", then asynchronous decompression can be called to boost performance.
/// If Returns value == 0, it means current block cannot be decompressed asynchronously.Meanwhile, asynchronous requests for previous blocks should be flushed if any.
size_t readCompressedDataBlockForAsynchronous(size_t & size_decompressed, size_t & size_compressed_without_checksum);
/// Decompress into memory pointed by `to`
void decompressTo(char * to, size_t size_decompressed, size_t size_compressed_without_checksum);
@ -46,6 +57,14 @@ protected:
/// It is more efficient for compression codec NONE but not suitable if you want to decompress into specific location.
void decompress(BufferBase::Buffer & to, size_t size_decompressed, size_t size_compressed_without_checksum);
/// Flush all asynchronous decompress request.
void flushAsynchronousDecompressRequests() const;
/// Set decompression mode: Synchronous/Asynchronous/SoftwareFallback.
/// The mode is "Synchronous" by default.
/// flushAsynchronousDecompressRequests must be called subsequently once set "Asynchronous" mode.
void setDecompressMode(ICompressionCodec::CodecMode mode) const;
public:
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
explicit CompressedReadBufferBase(ReadBuffer * in = nullptr, bool allow_different_codecs_ = false);

View File

@ -91,6 +91,9 @@ void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t
size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
{
size_t bytes_read = 0;
/// The codec mode is only relevant for codecs which support hardware offloading.
ICompressionCodec::CodecMode decompress_mode = ICompressionCodec::CodecMode::Synchronous;
bool read_tail = false;
/// If there are unread bytes in the buffer, then we copy needed to `to`.
if (pos < working_buffer.end())
@ -102,10 +105,28 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
size_t size_decompressed = 0;
size_t size_compressed_without_checksum = 0;
size_t new_size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum, false);
///Try to read block which is entirely located in a single 'compressed_in->' buffer.
size_t new_size_compressed = readCompressedDataBlockForAsynchronous(size_decompressed, size_compressed_without_checksum);
if (new_size_compressed)
{
/// Current block is entirely located in a single 'compressed_in->' buffer.
/// We can set asynchronous decompression mode if supported to boost performance.
decompress_mode = ICompressionCodec::CodecMode::Asynchronous;
}
else
{
/// Current block cannot be decompressed asynchronously, means it probably span across two compressed_in buffers.
/// Meanwhile, asynchronous requests for previous blocks should be flushed if any.
flushAsynchronousDecompressRequests();
/// Fallback to generic API
new_size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum, false);
decompress_mode = ICompressionCodec::CodecMode::Synchronous;
}
size_compressed = 0; /// file_in no longer points to the end of the block in working_buffer.
if (!new_size_compressed)
return bytes_read;
break;
auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();
@ -113,6 +134,7 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
/// need to skip some bytes in decompressed data (seek happened before readBig call).
if (nextimpl_working_buffer_offset == 0 && size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read)
{
setDecompressMode(decompress_mode);
decompressTo(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed;
bytes += size_decompressed;
@ -127,6 +149,8 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
assert(size_decompressed + additional_size_at_the_end_of_buffer > 0);
memory.resize(size_decompressed + additional_size_at_the_end_of_buffer);
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
/// Synchronous mode must be set since we need read partial data immediately from working buffer to target buffer.
setDecompressMode(ICompressionCodec::CodecMode::Synchronous);
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
/// Read partial data from first block. Won't run here at second block.
@ -145,15 +169,25 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
assert(size_decompressed + additional_size_at_the_end_of_buffer > 0);
memory.resize(size_decompressed + additional_size_at_the_end_of_buffer);
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
// Asynchronous mode can be set here because working_buffer wouldn't be overwritten any more since this is the last block.
setDecompressMode(ICompressionCodec::CodecMode::Asynchronous);
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
///Read partial data from last block.
pos = working_buffer.begin();
bytes_read += read(to + bytes_read, n - bytes_read);
read_tail = true;
break;
}
}
/// Here we must make sure all asynchronous requests above are completely done.
flushAsynchronousDecompressRequests();
if (read_tail)
{
/// Manually take nextimpl_working_buffer_offset into account, because we don't use
/// nextImpl in this method.
pos = working_buffer.begin();
bytes_read += read(to + bytes_read, n - bytes_read);
}
return bytes_read;
}

View File

@ -0,0 +1,413 @@
#ifdef ENABLE_QPL_COMPRESSION
#include <cstdio>
#include <thread>
#include <Compression/CompressionCodecDeflateQpl.h>
#include <Compression/CompressionFactory.h>
#include <Compression/CompressionInfo.h>
#include <Parsers/ASTIdentifier.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
}
std::array<qpl_job *, DeflateQplJobHWPool::MAX_HW_JOB_NUMBER> DeflateQplJobHWPool::hw_job_ptr_pool;
std::array<std::atomic_bool, DeflateQplJobHWPool::MAX_HW_JOB_NUMBER> DeflateQplJobHWPool::hw_job_ptr_locks;
bool DeflateQplJobHWPool::job_pool_ready = false;
std::unique_ptr<uint8_t[]> DeflateQplJobHWPool::hw_jobs_buffer;
DeflateQplJobHWPool & DeflateQplJobHWPool::instance()
{
static DeflateQplJobHWPool pool;
return pool;
}
DeflateQplJobHWPool::DeflateQplJobHWPool()
:random_engine(std::random_device()())
,distribution(0, MAX_HW_JOB_NUMBER-1)
{
Poco::Logger * log = &Poco::Logger::get("DeflateQplJobHWPool");
UInt32 job_size = 0;
const char * qpl_version = qpl_get_library_version();
/// Get size required for saving a single qpl job object
qpl_get_job_size(qpl_path_hardware, &job_size);
/// Allocate entire buffer for storing all job objects
hw_jobs_buffer = std::make_unique<uint8_t[]>(job_size * MAX_HW_JOB_NUMBER);
/// Initialize pool for storing all job object pointers
/// Reallocate buffer by shifting address offset for each job object.
for (UInt32 index = 0; index < MAX_HW_JOB_NUMBER; ++index)
{
qpl_job * qpl_job_ptr = reinterpret_cast<qpl_job *>(hw_jobs_buffer.get() + index * job_size);
if (qpl_init_job(qpl_path_hardware, qpl_job_ptr) != QPL_STS_OK)
{
job_pool_ready = false;
LOG_WARNING(log, "Initialization of hardware-assisted DeflateQpl codec failed, falling back to software DeflateQpl codec. Please check if Intel In-Memory Analytics Accelerator (IAA) is properly set up. QPL Version: {}.",qpl_version);
return;
}
hw_job_ptr_pool[index] = qpl_job_ptr;
unLockJob(index);
}
job_pool_ready = true;
LOG_DEBUG(log, "Hardware-assisted DeflateQpl codec is ready! QPL Version: {}",qpl_version);
}
DeflateQplJobHWPool::~DeflateQplJobHWPool()
{
for (UInt32 i = 0; i < MAX_HW_JOB_NUMBER; ++i)
{
if (hw_job_ptr_pool[i])
{
while (!tryLockJob(i));
qpl_fini_job(hw_job_ptr_pool[i]);
unLockJob(i);
hw_job_ptr_pool[i] = nullptr;
}
}
job_pool_ready = false;
}
qpl_job * DeflateQplJobHWPool::acquireJob(UInt32 &job_id)
{
if (isJobPoolReady())
{
UInt32 retry = 0;
auto index = distribution(random_engine);
while (!tryLockJob(index))
{
index = distribution(random_engine);
retry++;
if (retry > MAX_HW_JOB_NUMBER)
{
return nullptr;
}
}
job_id = MAX_HW_JOB_NUMBER - index;
assert(index < MAX_HW_JOB_NUMBER);
return hw_job_ptr_pool[index];
}
else
return nullptr;
}
void DeflateQplJobHWPool::releaseJob(UInt32 job_id)
{
if (isJobPoolReady())
unLockJob(MAX_HW_JOB_NUMBER - job_id);
}
bool DeflateQplJobHWPool::tryLockJob(UInt32 index)
{
bool expected = false;
assert(index < MAX_HW_JOB_NUMBER);
return hw_job_ptr_locks[index].compare_exchange_strong(expected, true);
}
void DeflateQplJobHWPool::unLockJob(UInt32 index)
{
assert(index < MAX_HW_JOB_NUMBER);
hw_job_ptr_locks[index].store(false);
}
//HardwareCodecDeflateQpl
HardwareCodecDeflateQpl::HardwareCodecDeflateQpl()
:log(&Poco::Logger::get("HardwareCodecDeflateQpl"))
{
}
HardwareCodecDeflateQpl::~HardwareCodecDeflateQpl()
{
#ifndef NDEBUG
assert(decomp_async_job_map.empty());
#else
if (!decomp_async_job_map.empty())
{
LOG_WARNING(log, "Find un-released job when HardwareCodecDeflateQpl destroy");
for (auto it : decomp_async_job_map)
{
DeflateQplJobHWPool::instance().releaseJob(it.first);
}
decomp_async_job_map.clear();
}
#endif
}
Int32 HardwareCodecDeflateQpl::doCompressData(const char * source, UInt32 source_size, char * dest, UInt32 dest_size) const
{
UInt32 job_id = 0;
qpl_job* job_ptr = nullptr;
UInt32 compressed_size = 0;
if (!(job_ptr = DeflateQplJobHWPool::instance().acquireJob(job_id)))
{
LOG_INFO(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doCompressData->acquireJob fail, probably job pool exhausted)");
return RET_ERROR;
}
job_ptr->op = qpl_op_compress;
job_ptr->next_in_ptr = reinterpret_cast<uint8_t *>(const_cast<char *>(source));
job_ptr->next_out_ptr = reinterpret_cast<uint8_t *>(dest);
job_ptr->available_in = source_size;
job_ptr->level = qpl_default_level;
job_ptr->available_out = dest_size;
job_ptr->flags = QPL_FLAG_FIRST | QPL_FLAG_DYNAMIC_HUFFMAN | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
if (auto status = qpl_execute_job(job_ptr); status == QPL_STS_OK)
{
compressed_size = job_ptr->total_out;
DeflateQplJobHWPool::instance().releaseJob(job_id);
return compressed_size;
}
else
{
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doCompressData->qpl_execute_job with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", status);
DeflateQplJobHWPool::instance().releaseJob(job_id);
return RET_ERROR;
}
}
Int32 HardwareCodecDeflateQpl::doDecompressDataSynchronous(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size)
{
UInt32 job_id = 0;
qpl_job * job_ptr = nullptr;
UInt32 decompressed_size = 0;
if (!(job_ptr = DeflateQplJobHWPool::instance().acquireJob(job_id)))
{
LOG_INFO(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataSynchronous->acquireJob fail, probably job pool exhausted)");
return RET_ERROR;
}
// Performing a decompression operation
job_ptr->op = qpl_op_decompress;
job_ptr->next_in_ptr = reinterpret_cast<uint8_t *>(const_cast<char *>(source));
job_ptr->next_out_ptr = reinterpret_cast<uint8_t *>(dest);
job_ptr->available_in = source_size;
job_ptr->available_out = uncompressed_size;
job_ptr->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST;
if (auto status = qpl_submit_job(job_ptr); status != QPL_STS_OK)
{
DeflateQplJobHWPool::instance().releaseJob(job_id);
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataSynchronous->qpl_execute_job with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", status);
return RET_ERROR;
}
/// Busy waiting till job complete.
do
{
_tpause(1, __rdtsc() + 1000);
} while (qpl_check_job(job_ptr) == QPL_STS_BEING_PROCESSED);
decompressed_size = job_ptr->total_out;
DeflateQplJobHWPool::instance().releaseJob(job_id);
return decompressed_size;
}
Int32 HardwareCodecDeflateQpl::doDecompressDataAsynchronous(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size)
{
UInt32 job_id = 0;
qpl_job * job_ptr = nullptr;
if (!(job_ptr = DeflateQplJobHWPool::instance().acquireJob(job_id)))
{
LOG_INFO(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataAsynchronous->acquireJob fail, probably job pool exhausted)");
return RET_ERROR;
}
// Performing a decompression operation
job_ptr->op = qpl_op_decompress;
job_ptr->next_in_ptr = reinterpret_cast<uint8_t *>(const_cast<char *>(source));
job_ptr->next_out_ptr = reinterpret_cast<uint8_t *>(dest);
job_ptr->available_in = source_size;
job_ptr->available_out = uncompressed_size;
job_ptr->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST;
if (auto status = qpl_submit_job(job_ptr); status == QPL_STS_OK)
{
decomp_async_job_map.insert({job_id, job_ptr});
return job_id;
}
else
{
DeflateQplJobHWPool::instance().releaseJob(job_id);
LOG_WARNING(log, "DeflateQpl HW codec failed, falling back to SW codec.(Details: doDecompressDataAsynchronous->qpl_execute_job with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", status);
return RET_ERROR;
}
}
void HardwareCodecDeflateQpl::flushAsynchronousDecompressRequests()
{
UInt32 n_jobs_processing = decomp_async_job_map.size();
std::map<UInt32, qpl_job *>::iterator it = decomp_async_job_map.begin();
while (n_jobs_processing)
{
UInt32 job_id = 0;
qpl_job * job_ptr = nullptr;
job_id = it->first;
job_ptr = it->second;
if (qpl_check_job(job_ptr) == QPL_STS_BEING_PROCESSED)
{
it++;
}
else
{
it = decomp_async_job_map.erase(it);
DeflateQplJobHWPool::instance().releaseJob(job_id);
n_jobs_processing--;
if (n_jobs_processing <= 0)
break;
}
if (it == decomp_async_job_map.end())
{
it = decomp_async_job_map.begin();
_tpause(1, __rdtsc() + 1000);
}
}
}
SoftwareCodecDeflateQpl::~SoftwareCodecDeflateQpl()
{
if (!sw_job)
qpl_fini_job(sw_job);
}
qpl_job * SoftwareCodecDeflateQpl::getJobCodecPtr()
{
if (!sw_job)
{
UInt32 size = 0;
qpl_get_job_size(qpl_path_software, &size);
sw_buffer = std::make_unique<uint8_t[]>(size);
sw_job = reinterpret_cast<qpl_job *>(sw_buffer.get());
// Job initialization
if (auto status = qpl_init_job(qpl_path_software, sw_job); status != QPL_STS_OK)
throw Exception(ErrorCodes::CANNOT_COMPRESS,
"Initialization of DeflateQpl software fallback codec failed. (Details: qpl_init_job with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", status);
}
return sw_job;
}
UInt32 SoftwareCodecDeflateQpl::doCompressData(const char * source, UInt32 source_size, char * dest, UInt32 dest_size)
{
qpl_job * job_ptr = getJobCodecPtr();
// Performing a compression operation
job_ptr->op = qpl_op_compress;
job_ptr->next_in_ptr = reinterpret_cast<uint8_t *>(const_cast<char *>(source));
job_ptr->next_out_ptr = reinterpret_cast<uint8_t *>(dest);
job_ptr->available_in = source_size;
job_ptr->available_out = dest_size;
job_ptr->level = qpl_default_level;
job_ptr->flags = QPL_FLAG_FIRST | QPL_FLAG_DYNAMIC_HUFFMAN | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
if (auto status = qpl_execute_job(job_ptr); status != QPL_STS_OK)
throw Exception(ErrorCodes::CANNOT_COMPRESS,
"Execution of DeflateQpl software fallback codec failed. (Details: qpl_execute_job with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", status);
return job_ptr->total_out;
}
void SoftwareCodecDeflateQpl::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size)
{
qpl_job * job_ptr = getJobCodecPtr();
// Performing a decompression operation
job_ptr->op = qpl_op_decompress;
job_ptr->next_in_ptr = reinterpret_cast<uint8_t *>(const_cast<char *>(source));
job_ptr->next_out_ptr = reinterpret_cast<uint8_t *>(dest);
job_ptr->available_in = source_size;
job_ptr->available_out = uncompressed_size;
job_ptr->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST;
if (auto status = qpl_execute_job(job_ptr); status != QPL_STS_OK)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS,
"Execution of DeflateQpl software fallback codec failed. (Details: qpl_execute_job with error code: {} - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h)", status);
}
//CompressionCodecDeflateQpl
CompressionCodecDeflateQpl::CompressionCodecDeflateQpl()
:hw_codec(std::make_unique<HardwareCodecDeflateQpl>())
,sw_codec(std::make_unique<SoftwareCodecDeflateQpl>())
{
setCodecDescription("DEFLATE_QPL");
}
uint8_t CompressionCodecDeflateQpl::getMethodByte() const
{
return static_cast<uint8_t>(CompressionMethodByte::DeflateQpl);
}
void CompressionCodecDeflateQpl::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecDeflateQpl::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
/// Aligned with ZLIB
return ((uncompressed_size) + ((uncompressed_size) >> 12) + ((uncompressed_size) >> 14) + ((uncompressed_size) >> 25) + 13);
}
UInt32 CompressionCodecDeflateQpl::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
Int32 res = HardwareCodecDeflateQpl::RET_ERROR;
if (DeflateQplJobHWPool::instance().isJobPoolReady())
res = hw_codec->doCompressData(source, source_size, dest, getMaxCompressedDataSize(source_size));
if (res == HardwareCodecDeflateQpl::RET_ERROR)
res = sw_codec->doCompressData(source, source_size, dest, getMaxCompressedDataSize(source_size));
return res;
}
void CompressionCodecDeflateQpl::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
switch (getDecompressMode())
{
case CodecMode::Synchronous:
{
Int32 res = HardwareCodecDeflateQpl::RET_ERROR;
if (DeflateQplJobHWPool::instance().isJobPoolReady())
{
res = hw_codec->doDecompressDataSynchronous(source, source_size, dest, uncompressed_size);
if (res == HardwareCodecDeflateQpl::RET_ERROR)
sw_codec->doDecompressData(source, source_size, dest, uncompressed_size);
}
else
sw_codec->doDecompressData(source, source_size, dest, uncompressed_size);
return;
}
case CodecMode::Asynchronous:
{
Int32 res = HardwareCodecDeflateQpl::RET_ERROR;
if (DeflateQplJobHWPool::instance().isJobPoolReady())
res = hw_codec->doDecompressDataAsynchronous(source, source_size, dest, uncompressed_size);
if (res == HardwareCodecDeflateQpl::RET_ERROR)
sw_codec->doDecompressData(source, source_size, dest, uncompressed_size);
return;
}
case CodecMode::SoftwareFallback:
sw_codec->doDecompressData(source, source_size, dest, uncompressed_size);
return;
}
__builtin_unreachable();
}
void CompressionCodecDeflateQpl::flushAsynchronousDecompressRequests()
{
if (DeflateQplJobHWPool::instance().isJobPoolReady())
hw_codec->flushAsynchronousDecompressRequests();
/// After flush previous all async requests, we must restore mode to be synchronous by default.
setDecompressMode(CodecMode::Synchronous);
}
void registerCodecDeflateQpl(CompressionCodecFactory & factory)
{
factory.registerSimpleCompressionCodec(
"DEFLATE_QPL", static_cast<char>(CompressionMethodByte::DeflateQpl), [&]() { return std::make_shared<CompressionCodecDeflateQpl>(); });
}
}
#endif

View File

@ -0,0 +1,120 @@
#pragma once
#include <Compression/ICompressionCodec.h>
#include <qpl/qpl.h>
#include <random>
namespace Poco
{
class Logger;
}
namespace DB
{
/// DeflateQplJobHWPool is resource pool to provide the job objects.
/// Job object is used for storing context information during offloading compression job to HW Accelerator.
class DeflateQplJobHWPool
{
public:
DeflateQplJobHWPool();
~DeflateQplJobHWPool();
qpl_job * acquireJob(UInt32 &job_id);
static void releaseJob(UInt32 job_id);
static const bool & isJobPoolReady() { return job_pool_ready; }
static DeflateQplJobHWPool & instance();
private:
static bool tryLockJob(UInt32 index);
static void unLockJob(UInt32 index);
/// Maximum jobs running in parallel supported by IAA hardware
static constexpr auto MAX_HW_JOB_NUMBER = 1024;
/// Entire buffer for storing all job objects
static std::unique_ptr<uint8_t[]> hw_jobs_buffer;
/// Job pool for storing all job object pointers
static std::array<qpl_job *, DeflateQplJobHWPool::MAX_HW_JOB_NUMBER> hw_job_ptr_pool;
/// Locks for accessing each job object pointers
static std::array<std::atomic_bool, DeflateQplJobHWPool::MAX_HW_JOB_NUMBER> hw_job_ptr_locks;
static bool job_pool_ready;
std::mt19937 random_engine;
std::uniform_int_distribution<int> distribution;
};
class SoftwareCodecDeflateQpl
{
public:
~SoftwareCodecDeflateQpl();
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest, UInt32 dest_size);
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size);
private:
qpl_job * sw_job = nullptr;
std::unique_ptr<uint8_t[]> sw_buffer;
qpl_job * getJobCodecPtr();
};
class HardwareCodecDeflateQpl
{
public:
/// RET_ERROR stands for hardware codec fail,need fallback to software codec.
static constexpr Int32 RET_ERROR = -1;
HardwareCodecDeflateQpl();
~HardwareCodecDeflateQpl();
Int32 doCompressData(const char * source, UInt32 source_size, char * dest, UInt32 dest_size) const;
///Submit job request to the IAA hardware and then busy waiting till it complete.
Int32 doDecompressDataSynchronous(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size);
///Submit job request to the IAA hardware and return immediately. IAA hardware will process decompression jobs automatically.
Int32 doDecompressDataAsynchronous(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size);
/// Flush result for all previous requests which means busy waiting till all the jobs in "decomp_async_job_map" are finished.
/// Must be called subsequently after several calls of doDecompressDataReq.
void flushAsynchronousDecompressRequests();
private:
/// Asynchronous job map for decompression: job ID - job object.
/// For each submission, push job ID && job object into this map;
/// For flush, pop out job ID && job object from this map. Use job ID to release job lock and use job object to check job status till complete.
std::map<UInt32, qpl_job *> decomp_async_job_map;
Poco::Logger * log;
};
class CompressionCodecDeflateQpl : public ICompressionCodec
{
public:
CompressionCodecDeflateQpl();
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
bool isCompression() const override
{
return true;
}
bool isGenericCompression() const override
{
return true;
}
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
///Flush result for previous asynchronous decompression requests on asynchronous mode.
void flushAsynchronousDecompressRequests() override;
private:
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
std::unique_ptr<HardwareCodecDeflateQpl> hw_codec;
std::unique_ptr<SoftwareCodecDeflateQpl> sw_codec;
};
}

View File

@ -166,7 +166,7 @@ void registerCodecLZ4(CompressionCodecFactory & factory);
void registerCodecLZ4HC(CompressionCodecFactory & factory);
void registerCodecZSTD(CompressionCodecFactory & factory);
void registerCodecMultiple(CompressionCodecFactory & factory);
void registerCodecDeflateQpl(CompressionCodecFactory & factory);
/// Keeper use only general-purpose codecs, so we don't need these special codecs
/// in standalone build
@ -188,7 +188,6 @@ CompressionCodecFactory::CompressionCodecFactory()
registerCodecZSTD(*this);
registerCodecLZ4HC(*this);
registerCodecMultiple(*this);
#ifndef KEEPER_STANDALONE_BUILD
registerCodecDelta(*this);
registerCodecT64(*this);
@ -196,6 +195,9 @@ CompressionCodecFactory::CompressionCodecFactory()
registerCodecGorilla(*this);
registerCodecEncrypted(*this);
registerCodecFPC(*this);
#ifdef ENABLE_QPL_COMPRESSION
registerCodecDeflateQpl(*this);
#endif
#endif
default_codec = get("LZ4", {});

View File

@ -45,7 +45,8 @@ enum class CompressionMethodByte : uint8_t
Gorilla = 0x95,
AES_128_GCM_SIV = 0x96,
AES_256_GCM_SIV = 0x97,
FPC = 0x98
FPC = 0x98,
DeflateQpl = 0x99,
};
}

View File

@ -91,7 +91,6 @@ UInt32 ICompressionCodec::compress(const char * source, UInt32 source_size, char
return header_size + compressed_bytes_written;
}
UInt32 ICompressionCodec::decompress(const char * source, UInt32 source_size, char * dest) const
{
assert(source != nullptr && dest != nullptr);

View File

@ -45,9 +45,37 @@ public:
/// Compressed bytes from uncompressed source to dest. Dest should preallocate memory
UInt32 compress(const char * source, UInt32 source_size, char * dest) const;
/// Decompress bytes from compressed source to dest. Dest should preallocate memory
/// Decompress bytes from compressed source to dest. Dest should preallocate memory;
UInt32 decompress(const char * source, UInt32 source_size, char * dest) const;
/// Three kinds of codec mode:
/// Synchronous mode which is commonly used by default;
/// --- For the codec with HW decompressor, it means submit request to HW and busy wait till complete.
/// Asynchronous mode which required HW decompressor support;
/// --- For the codec with HW decompressor, it means submit request to HW and return immediately.
/// --- Must be used in pair with flushAsynchronousDecompressRequests.
/// SoftwareFallback mode is exclusively defined for the codec with HW decompressor, enable its capability of "fallback to SW codec".
enum class CodecMode
{
Synchronous,
Asynchronous,
SoftwareFallback
};
/// Get current decompression mode
CodecMode getDecompressMode() const{ return decompressMode; }
/// if set mode to CodecMode::Asynchronous, must be followed with flushAsynchronousDecompressRequests
void setDecompressMode(CodecMode mode){ decompressMode = mode; }
/// Flush result for previous asynchronous decompression requests.
/// This function must be called following several requests offload to HW.
/// To make sure asynchronous results have been flushed into target buffer completely.
/// Meanwhile, source and target buffer for decompression can not be overwritten until this function execute completely.
/// Otherwise it would conflict with HW offloading and cause exception.
/// For QPL deflate, it support the maximum number of requests equal to DeflateQplJobHWPool::jobPoolSize
virtual void flushAsynchronousDecompressRequests(){}
/// Number of bytes, that will be used to compress uncompressed_size bytes with current codec
virtual UInt32 getCompressedReserveSize(UInt32 uncompressed_size) const
{
@ -103,6 +131,7 @@ protected:
private:
ASTPtr full_codec_desc;
CodecMode decompressMode{CodecMode::Synchronous};
};
}