Merge pull request #50775 from rschu1ze/non-experimental-qpl-deflate

Mark QPL_DEFLATE non-experimental but default: off-by-default
This commit is contained in:
Robert Schulze 2023-06-21 10:43:16 +02:00 committed by GitHub
commit 06e8590c8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 182 additions and 18 deletions

View File

@ -380,11 +380,15 @@ High compression levels are useful for asymmetric scenarios, like compress once,
`DEFLATE_QPL` — [Deflate compression algorithm](https://github.com/intel/qpl) implemented by Intel® Query Processing Library. Some limitations apply:
- DEFLATE_QPL is experimental and can only be used after setting configuration parameter `allow_experimental_codecs=1`.
- DEFLATE_QPL is disabled by default and can only be used after setting configuration parameter `enable_deflate_qpl_codec = 1`.
- DEFLATE_QPL requires a ClickHouse build compiled with SSE 4.2 instructions (by default, this is the case). Refer to [Build Clickhouse with DEFLATE_QPL](/docs/en/development/building_and_benchmarking_deflate_qpl.md/#Build-Clickhouse-with-DEFLATE_QPL) for more details.
- DEFLATE_QPL works best if the system has a Intel® IAA (In-Memory Analytics Accelerator) offloading device. Refer to [Accelerator Configuration](https://intel.github.io/qpl/documentation/get_started_docs/installation.html#accelerator-configuration) and [Benchmark with DEFLATE_QPL](/docs/en/development/building_and_benchmarking_deflate_qpl.md/#Run-Benchmark-with-DEFLATE_QPL) for more details.
- DEFLATE_QPL-compressed data can only be transferred between ClickHouse nodes compiled with SSE 4.2 enabled.
:::note
DEFLATE_QPL is not available in ClickHouse Cloud.
:::
### Specialized Codecs
These codecs are designed to make compression more effective by using specific features of data. Some of these codecs do not compress data themself. Instead, they prepare the data for a common purpose codec, which compresses it better than without this preparation.

View File

@ -588,7 +588,7 @@ void Connection::sendQuery(
if (method == "ZSTD")
level = settings->network_zstd_compression_level;
CompressionCodecFactory::instance().validateCodec(method, level, !settings->allow_suspicious_codecs, settings->allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodec(method, level, !settings->allow_suspicious_codecs, settings->allow_experimental_codecs, settings->enable_deflate_qpl_codec);
compression_codec = CompressionCodecFactory::instance().get(method, level);
}
else

View File

@ -8,6 +8,7 @@
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
#include "libaccel_config.h"
#include <Common/MemorySanitizer.h>
namespace DB
{
@ -382,6 +383,11 @@ UInt32 CompressionCodecDeflateQpl::getMaxCompressedDataSize(UInt32 uncompressed_
UInt32 CompressionCodecDeflateQpl::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
/// QPL library is using AVX-512 with some shuffle operations.
/// Memory sanitizer don't understand if there was uninitialized memory in SIMD register but it was not used in the result of shuffle.
#if defined(MEMORY_SANITIZER)
__msan_unpoison(dest, getMaxCompressedDataSize(source_size));
#endif
Int32 res = HardwareCodecDeflateQpl::RET_ERROR;
if (DeflateQplJobHWPool::instance().isJobPoolReady())
res = hw_codec->doCompressData(source, source_size, dest, getMaxCompressedDataSize(source_size));
@ -392,6 +398,11 @@ UInt32 CompressionCodecDeflateQpl::doCompressData(const char * source, UInt32 so
void CompressionCodecDeflateQpl::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
/// QPL library is using AVX-512 with some shuffle operations.
/// Memory sanitizer don't understand if there was uninitialized memory in SIMD register but it was not used in the result of shuffle.
#if defined(MEMORY_SANITIZER)
__msan_unpoison(dest, uncompressed_size);
#endif
switch (getDecompressMode())
{
case CodecMode::Synchronous:

View File

@ -98,7 +98,7 @@ public:
protected:
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
bool isExperimental() const override { return true; }
bool isDeflateQpl() const override { return true; }
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -40,10 +40,10 @@ public:
CompressionCodecPtr getDefaultCodec() const;
/// Validate codecs AST specified by user and parses codecs description (substitute default parameters)
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const;
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs, bool enable_deflate_qpl_codec) const;
/// Validate codecs AST specified by user
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const;
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs, bool enable_deflate_qpl_codec) const;
/// Get codec by AST and possible column_type. Some codecs can use
/// information about type to improve inner settings, but every codec should

View File

@ -34,7 +34,7 @@ namespace ErrorCodes
void CompressionCodecFactory::validateCodec(
const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const
const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs, bool enable_deflate_qpl_codec) const
{
if (family_name.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Compression codec name cannot be empty");
@ -43,13 +43,13 @@ void CompressionCodecFactory::validateCodec(
{
auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)),
{}, sanity_check, allow_experimental_codecs);
{}, sanity_check, allow_experimental_codecs, enable_deflate_qpl_codec);
}
else
{
auto identifier = std::make_shared<ASTIdentifier>(Poco::toUpper(family_name));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", identifier),
{}, sanity_check, allow_experimental_codecs);
{}, sanity_check, allow_experimental_codecs, enable_deflate_qpl_codec);
}
}
@ -77,7 +77,7 @@ bool innerDataTypeIsFloat(const DataTypePtr & type)
}
ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const
const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs, bool enable_deflate_qpl_codec) const
{
if (const auto * func = ast->as<ASTFunction>())
{
@ -159,6 +159,12 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
" You can enable it with the 'allow_experimental_codecs' setting.",
codec_family_name);
if (!enable_deflate_qpl_codec && result_codec->isDeflateQpl())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Codec {} is disabled by default."
" You can enable it with the 'enable_deflate_qpl_codec' setting.",
codec_family_name);
codecs_descriptions->children.emplace_back(result_codec->getCodecDesc());
}

View File

@ -109,6 +109,9 @@ public:
/// It will not be allowed to use unless the user will turn off the safety switch.
virtual bool isExperimental() const { return false; }
/// Is this the DEFLATE_QPL codec?
virtual bool isDeflateQpl() const { return false; }
/// If it does nothing.
virtual bool isNone() const { return false; }

View File

@ -327,6 +327,7 @@ class IColumn;
M(Bool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.", 0) \
M(Bool, allow_suspicious_codecs, false, "If it is set to true, allow to specify meaningless compression codecs.", 0) \
M(Bool, allow_experimental_codecs, false, "If it is set to true, allow to specify experimental compression codecs (but we don't have those yet and this option does nothing).", 0) \
M(Bool, enable_deflate_qpl_codec, false, "Enable/disable the DEFLATE_QPL codec.", 0) \
M(UInt64, query_profiler_real_time_period_ns, QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, query_profiler_cpu_time_period_ns, QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(Bool, metrics_perf_events_enabled, false, "If enabled, some of the perf events will be measured throughout queries' execution.", 0) \

View File

@ -571,6 +571,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
bool sanity_check_compression_codecs = !attach && !context_->getSettingsRef().allow_suspicious_codecs;
bool allow_experimental_codecs = attach || context_->getSettingsRef().allow_experimental_codecs;
bool enable_deflate_qpl_codec = attach || context_->getSettingsRef().enable_deflate_qpl_codec;
ColumnsDescription res;
auto name_type_it = column_names_and_types.begin();
@ -631,7 +632,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
if (col_decl.default_specifier == "ALIAS")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot specify codec for column type ALIAS");
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
col_decl.codec, column.type, sanity_check_compression_codecs, allow_experimental_codecs);
col_decl.codec, column.type, sanity_check_compression_codecs, allow_experimental_codecs, enable_deflate_qpl_codec);
}
if (col_decl.ttl)

View File

@ -1775,7 +1775,7 @@ void TCPHandler::initBlockOutput(const Block & block)
if (state.compression == Protocol::Compression::Enable)
{
CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs, query_settings.allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs, query_settings.allow_experimental_codecs, query_settings.enable_deflate_qpl_codec);
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
*out, CompressionCodecFactory::instance().get(method, level));

View File

@ -388,7 +388,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
column.comment = *comment;
if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type, false, true);
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type, false, true, true);
column.ttl = ttl;
@ -429,7 +429,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
else
{
if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false, true);
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false, true, true);
if (comment)
column.comment = *comment;
@ -1067,7 +1067,7 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
"this column name is reserved for lightweight delete feature", backQuote(column_name));
if (command.codec)
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs, context->getSettingsRef().enable_deflate_qpl_codec);
all_columns.add(ColumnDescription(column_name, command.data_type));
}
@ -1093,7 +1093,7 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
{
if (all_columns.hasAlias(column_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot specify codec for column type ALIAS");
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs, context->getSettingsRef().enable_deflate_qpl_codec);
}
auto column_default = all_columns.getDefault(column_name);
if (column_default)

View File

@ -130,7 +130,7 @@ void ColumnDescription::readText(ReadBuffer & buf)
comment = col_ast->comment->as<ASTLiteral &>().value.get<String>();
if (col_ast->codec)
codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(col_ast->codec, type, false, true);
codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(col_ast->codec, type, false, true, true);
if (col_ast->ttl)
ttl = col_ast->ttl;

View File

@ -733,7 +733,7 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
if (compression_method == "ZSTD")
compression_level = settings.network_zstd_compression_level;
CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs, settings.allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs, settings.allow_experimental_codecs, settings.enable_deflate_qpl_codec);
CompressionCodecPtr compression_codec = CompressionCodecFactory::instance().get(compression_method, compression_level);
/// tmp directory is used to ensure atomicity of transactions

View File

@ -64,6 +64,7 @@ const char * auto_config_build[]
"USE_ARROW", "@USE_ARROW@",
"USE_ORC", "@USE_ORC@",
"USE_MSGPACK", "@USE_MSGPACK@",
"USE_QPL", "@ENABLE_QPL@",
"GIT_HASH", "@GIT_HASH@",
"GIT_BRANCH", R"IRjaNsZIL9Yh7FQ4(@GIT_BRANCH@)IRjaNsZIL9Yh7FQ4",
"GIT_DATE", "@GIT_DATE@",

View File

@ -285,7 +285,7 @@ TTLDescription TTLDescription::getTTLFromAST(
{
result.recompression_codec =
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs, context->getSettingsRef().enable_deflate_qpl_codec);
}
}

View File

@ -20,6 +20,7 @@ def get_options(i, upgrade_check):
'''--db-engine="Replicated('/test/db/test_{}', 's1', 'r1')"'''.format(i)
)
client_options.append("allow_experimental_database_replicated=1")
client_options.append("enable_deflate_qpl_codec=1")
# If database name is not specified, new database is created for each functional test.
# Run some threads with one database for all tests.

View File

@ -0,0 +1,11 @@
<clickhouse>
<compression>
<case>
<!-- Conditions. All must be satisfied simultaneously. Some conditions may not be specified. -->
<min_part_size>0</min_part_size> <!-- The minimum size of a part in bytes. -->
<min_part_size_ratio>0</min_part_size_ratio> <!-- The minimum size of the part relative to all the data in the table. -->
<!-- Which compression method to choose. -->
<method>deflate_qpl</method>
</case>
</compression>
</clickhouse>

View File

@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<enable_deflate_qpl_codec>1</enable_deflate_qpl_codec>
</default>
</profiles>
</clickhouse>

View File

@ -38,6 +38,14 @@ node5 = cluster.add_instance(
)
node6 = cluster.add_instance(
"node6",
main_configs=["configs/deflateqpl_compression_by_default.xml"],
user_configs=[
"configs/allow_suspicious_codecs.xml",
"configs/enable_deflateqpl_codec.xml",
],
)
node7 = cluster.add_instance(
"node7",
main_configs=["configs/allow_experimental_codecs.xml"],
user_configs=["configs/allow_suspicious_codecs.xml"],
)
@ -244,3 +252,58 @@ def test_uncompressed_cache_plus_zstd_codec(start_cluster):
)
== "10000\n"
)
def test_preconfigured_deflateqpl_codec(start_cluster):
node6.query(
"""
CREATE TABLE compression_codec_multiple_with_key (
somedate Date CODEC(ZSTD, ZSTD, ZSTD(12), LZ4HC(12), DEFLATE_QPL),
id UInt64 CODEC(LZ4, ZSTD, NONE, LZ4HC, DEFLATE_QPL),
data String CODEC(ZSTD(2), LZ4HC, NONE, LZ4, LZ4, DEFLATE_QPL),
somecolumn Float64
) ENGINE = MergeTree() PARTITION BY somedate ORDER BY id SETTINGS index_granularity = 2;
"""
)
node6.query(
"INSERT INTO compression_codec_multiple_with_key VALUES(toDate('2018-10-12'), 100000, 'hello', 88.88), (toDate('2018-10-12'), 100002, 'world', 99.99), (toDate('2018-10-12'), 1111, '!', 777.777)"
)
assert (
node6.query(
"SELECT COUNT(*) FROM compression_codec_multiple_with_key WHERE id % 2 == 0"
)
== "2\n"
)
assert (
node6.query(
"SELECT DISTINCT somecolumn FROM compression_codec_multiple_with_key ORDER BY id"
)
== "777.777\n88.88\n99.99\n"
)
assert (
node6.query(
"SELECT data FROM compression_codec_multiple_with_key WHERE id >= 1112 AND somedate = toDate('2018-10-12') AND somecolumn <= 100"
)
== "hello\nworld\n"
)
node6.query(
"INSERT INTO compression_codec_multiple_with_key SELECT toDate('2018-10-12'), number, toString(number), 1.0 FROM system.numbers LIMIT 10000"
)
assert (
node6.query(
"SELECT COUNT(id) FROM compression_codec_multiple_with_key WHERE id % 10 == 0"
)
== "1001\n"
)
assert (
node6.query("SELECT SUM(somecolumn) FROM compression_codec_multiple_with_key")
== str(777.777 + 88.88 + 99.99 + 1.0 * 10000) + "\n"
)
assert (
node6.query(
"SELECT count(*) FROM compression_codec_multiple_with_key GROUP BY somedate"
)
== "10003\n"
)

View File

@ -0,0 +1,6 @@
CREATE TABLE default.compression_codec\n(\n `id` UInt64 CODEC(DEFLATE_QPL),\n `data` String CODEC(DEFLATE_QPL),\n `ddd` Date CODEC(DEFLATE_QPL),\n `ddd32` Date32 CODEC(DEFLATE_QPL),\n `somenum` Float64 CODEC(DEFLATE_QPL),\n `somestr` FixedString(3) CODEC(DEFLATE_QPL),\n `othernum` Int64 CODEC(DEFLATE_QPL),\n `somearray` Array(UInt8) CODEC(DEFLATE_QPL),\n `somemap` Map(String, UInt32) CODEC(DEFLATE_QPL),\n `sometuple` Tuple(UInt16, UInt64) CODEC(DEFLATE_QPL)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
1 hello 2018-12-14 2018-12-14 1.1 aaa 5 [1,2,3] {'k1':1,'k2':2} (1,2)
2 world 2018-12-15 2018-12-15 2.2 bbb 6 [4,5,6] {'k3':3,'k4':4} (3,4)
3 ! 2018-12-16 2018-12-16 3.3 ccc 7 [7,8,9] {'k5':5,'k6':6} (5,6)
2
10001

View File

@ -0,0 +1,49 @@
--Tags: no-fasttest, no-cpu-aarch64
-- no-fasttest because DEFLATE_QPL isn't available in fasttest
-- no-cpu-aarch64 because DEFLATE_QPL is x86-only
-- A bunch of random DDLs to test the DEFLATE_QPL codec.
SET enable_deflate_qpl_codec = 1;
-- Suppress test failures because stderr contains warning "Initialization of hardware-assisted DeflateQpl failed, falling
-- back to software DeflateQpl coded."
SET send_logs_level = 'fatal';
DROP TABLE IF EXISTS compression_codec;
CREATE TABLE compression_codec(
id UInt64 CODEC(DEFLATE_QPL),
data String CODEC(DEFLATE_QPL),
ddd Date CODEC(DEFLATE_QPL),
ddd32 Date32 CODEC(DEFLATE_QPL),
somenum Float64 CODEC(DEFLATE_QPL),
somestr FixedString(3) CODEC(DEFLATE_QPL),
othernum Int64 CODEC(DEFLATE_QPL),
somearray Array(UInt8) CODEC(DEFLATE_QPL),
somemap Map(String, UInt32) CODEC(DEFLATE_QPL),
sometuple Tuple(UInt16, UInt64) CODEC(DEFLATE_QPL),
) ENGINE = MergeTree() ORDER BY tuple();
SHOW CREATE TABLE compression_codec;
INSERT INTO compression_codec VALUES(1, 'hello', toDate('2018-12-14'), toDate32('2018-12-14'), 1.1, 'aaa', 5, [1,2,3], map('k1',1,'k2',2), tuple(1,2));
INSERT INTO compression_codec VALUES(2, 'world', toDate('2018-12-15'), toDate32('2018-12-15'), 2.2, 'bbb', 6, [4,5,6], map('k3',3,'k4',4), tuple(3,4));
INSERT INTO compression_codec VALUES(3, '!', toDate('2018-12-16'), toDate32('2018-12-16'), 3.3, 'ccc', 7, [7,8,9], map('k5',5,'k6',6), tuple(5,6));
SELECT * FROM compression_codec ORDER BY id;
OPTIMIZE TABLE compression_codec FINAL;
INSERT INTO compression_codec VALUES(2, '', toDate('2018-12-13'), toDate32('2018-12-13'), 4.4, 'ddd', 8, [10,11,12], map('k7',7,'k8',8), tuple(7,8));
DETACH TABLE compression_codec;
ATTACH TABLE compression_codec;
SELECT count(*) FROM compression_codec WHERE id = 2 GROUP BY id;
INSERT INTO compression_codec SELECT 3, '!', toDate('2018-12-16'), toDate32('2018-12-16'), 3.3, 'ccc', 7, [7,8,9], map('k5',5,'k6',6), tuple(5,6) FROM system.numbers LIMIT 10000;
SELECT count(*) FROM compression_codec WHERE id = 3 GROUP BY id;
DROP TABLE IF EXISTS compression_codec;