Add extensive testing cases for deflate qpl codec

This commit is contained in:
jinjunzh 2023-05-24 13:26:15 -04:00 committed by Robert Schulze
parent 5344ff2516
commit 056ca4f555
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
18 changed files with 153 additions and 37 deletions

View File

@ -380,7 +380,7 @@ High compression levels are useful for asymmetric scenarios, like compress once,
`DEFLATE_QPL` — [Deflate compression algorithm](https://github.com/intel/qpl) implemented by Intel® Query Processing Library. Some limitations apply:
- DEFLATE_QPL is experimental and can only be used after setting configuration parameter `allow_experimental_codecs=1`.
- DEFLATE_QPL is disabled by default and can only be used after setting configuration parameter `enable_qpl_deflate=1`.
- DEFLATE_QPL requires a ClickHouse build compiled with SSE 4.2 instructions (by default, this is the case). Refer to [Build Clickhouse with DEFLATE_QPL](/docs/en/development/building_and_benchmarking_deflate_qpl.md/#Build-Clickhouse-with-DEFLATE_QPL) for more details.
- DEFLATE_QPL works best if the system has a Intel® IAA (In-Memory Analytics Accelerator) offloading device. Refer to [Accelerator Configuration](https://intel.github.io/qpl/documentation/get_started_docs/installation.html#accelerator-configuration) and [Benchmark with DEFLATE_QPL](/docs/en/development/building_and_benchmarking_deflate_qpl.md/#Run-Benchmark-with-DEFLATE_QPL) for more details.
- DEFLATE_QPL-compressed data can only be transferred between ClickHouse nodes compiled with SSE 4.2 enabled.

View File

@ -588,7 +588,7 @@ void Connection::sendQuery(
if (method == "ZSTD")
level = settings->network_zstd_compression_level;
CompressionCodecFactory::instance().validateCodec(method, level, !settings->allow_suspicious_codecs, settings->allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodec(method, level, !settings->allow_suspicious_codecs, settings->allow_experimental_codecs, settings->enable_qpl_deflate);
compression_codec = CompressionCodecFactory::instance().get(method, level);
}
else

View File

@ -98,7 +98,8 @@ public:
protected:
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
bool isExperimental() const override { return true; }
bool isExperimental() const override { return false; }
bool isDeflateQplCompression() const override { return true; }
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -40,10 +40,10 @@ public:
CompressionCodecPtr getDefaultCodec() const;
/// Validate codecs AST specified by user and parses codecs description (substitute default parameters)
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const;
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs, bool enable_qpl_deflate) const;
/// Validate codecs AST specified by user
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const;
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs, bool enable_qpl_deflate) const;
/// Get codec by AST and possible column_type. Some codecs can use
/// information about type to improve inner settings, but every codec should

View File

@ -34,7 +34,7 @@ namespace ErrorCodes
void CompressionCodecFactory::validateCodec(
const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const
const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs, bool enable_qpl_deflate) const
{
if (family_name.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Compression codec name cannot be empty");
@ -43,13 +43,13 @@ void CompressionCodecFactory::validateCodec(
{
auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)),
{}, sanity_check, allow_experimental_codecs);
{}, sanity_check, allow_experimental_codecs, enable_qpl_deflate);
}
else
{
auto identifier = std::make_shared<ASTIdentifier>(Poco::toUpper(family_name));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", identifier),
{}, sanity_check, allow_experimental_codecs);
{}, sanity_check, allow_experimental_codecs, enable_qpl_deflate);
}
}
@ -77,7 +77,7 @@ bool innerDataTypeIsFloat(const DataTypePtr & type)
}
ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const
const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs, bool enable_qpl_deflate) const
{
if (const auto * func = ast->as<ASTFunction>())
{
@ -159,6 +159,12 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
" You can enable it with the 'allow_experimental_codecs' setting.",
codec_family_name);
if (!enable_qpl_deflate && result_codec->isDeflateQplCompression())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Codec {} is disabled by default."
" You can enable it with the 'enable_qpl_deflate' setting.",
codec_family_name);
codecs_descriptions->children.emplace_back(result_codec->getCodecDesc());
}

View File

@ -112,6 +112,9 @@ public:
/// If it does nothing.
virtual bool isNone() const { return false; }
/// This is a knob for Deflate QPL codec.
virtual bool isDeflateQplCompression() const { return false; }
protected:
/// This is used for fuzz testing
friend int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size);

View File

@ -319,6 +319,7 @@ class IColumn;
M(Bool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.", 0) \
M(Bool, allow_suspicious_codecs, false, "If it is set to true, allow to specify meaningless compression codecs.", 0) \
M(Bool, allow_experimental_codecs, false, "If it is set to true, allow to specify experimental compression codecs (but we don't have those yet and this option does nothing).", 0) \
M(Bool, enable_qpl_deflate, false, "If it is set to true, allow to use deflate_qpl for compression.", 0) \
M(UInt64, query_profiler_real_time_period_ns, QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, query_profiler_cpu_time_period_ns, QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(Bool, metrics_perf_events_enabled, false, "If enabled, some of the perf events will be measured throughout queries' execution.", 0) \

View File

@ -571,6 +571,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
bool sanity_check_compression_codecs = !attach && !context_->getSettingsRef().allow_suspicious_codecs;
bool allow_experimental_codecs = attach || context_->getSettingsRef().allow_experimental_codecs;
bool enable_qpl_deflate = attach || context_->getSettingsRef().enable_qpl_deflate;
ColumnsDescription res;
auto name_type_it = column_names_and_types.begin();
@ -631,7 +632,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
if (col_decl.default_specifier == "ALIAS")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot specify codec for column type ALIAS");
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
col_decl.codec, column.type, sanity_check_compression_codecs, allow_experimental_codecs);
col_decl.codec, column.type, sanity_check_compression_codecs, allow_experimental_codecs, enable_qpl_deflate);
}
if (col_decl.ttl)

View File

@ -1775,7 +1775,7 @@ void TCPHandler::initBlockOutput(const Block & block)
if (state.compression == Protocol::Compression::Enable)
{
CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs, query_settings.allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs, query_settings.allow_experimental_codecs, query_settings.enable_qpl_deflate);
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
*out, CompressionCodecFactory::instance().get(method, level));

View File

@ -388,7 +388,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
column.comment = *comment;
if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type, false, true);
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type, false, true, true);
column.ttl = ttl;
@ -429,7 +429,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
else
{
if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false, true);
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false, true, true);
if (comment)
column.comment = *comment;
@ -1067,7 +1067,7 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
"this column name is reserved for lightweight delete feature", backQuote(column_name));
if (command.codec)
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs, context->getSettingsRef().enable_qpl_deflate);
all_columns.add(ColumnDescription(column_name, command.data_type));
}
@ -1093,7 +1093,7 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
{
if (all_columns.hasAlias(column_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot specify codec for column type ALIAS");
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs, context->getSettingsRef().enable_qpl_deflate);
}
auto column_default = all_columns.getDefault(column_name);
if (column_default)

View File

@ -130,7 +130,7 @@ void ColumnDescription::readText(ReadBuffer & buf)
comment = col_ast->comment->as<ASTLiteral &>().value.get<String>();
if (col_ast->codec)
codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(col_ast->codec, type, false, true);
codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(col_ast->codec, type, false, true, true);
if (col_ast->ttl)
ttl = col_ast->ttl;

View File

@ -733,7 +733,7 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
if (compression_method == "ZSTD")
compression_level = settings.network_zstd_compression_level;
CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs, settings.allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs, settings.allow_experimental_codecs, settings.enable_qpl_deflate);
CompressionCodecPtr compression_codec = CompressionCodecFactory::instance().get(compression_method, compression_level);
/// tmp directory is used to ensure atomicity of transactions

View File

@ -285,7 +285,7 @@ TTLDescription TTLDescription::getTTLFromAST(
{
result.recompression_codec =
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs, context->getSettingsRef().enable_qpl_deflate);
}
}

View File

@ -0,0 +1,11 @@
<clickhouse>
<compression>
<case>
<!-- Conditions. All must be satisfied simultaneously. Some conditions may not be specified. -->
<min_part_size>0</min_part_size> <!-- The minimum size of a part in bytes. -->
<min_part_size_ratio>0</min_part_size_ratio> <!-- The minimum size of the part relative to all the data in the table. -->
<!-- Which compression method to choose. -->
<method>deflate_qpl</method>
</case>
</compression>
</clickhouse>

View File

@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<enable_qpl_deflate>1</enable_qpl_deflate>
</default>
</profiles>
</clickhouse>

View File

@ -41,7 +41,14 @@ node6 = cluster.add_instance(
main_configs=["configs/allow_experimental_codecs.xml"],
user_configs=["configs/allow_suspicious_codecs.xml"],
)
node7 = cluster.add_instance(
"node7",
main_configs=["configs/deflateqpl_compression_by_default.xml"],
user_configs=[
"configs/enable_deflateqpl_codec.xml",
"configs/allow_suspicious_codecs.xml",
],
)
@pytest.fixture(scope="module")
def start_cluster():
@ -244,3 +251,59 @@ def test_uncompressed_cache_plus_zstd_codec(start_cluster):
)
== "10000\n"
)
def test_preconfigured_deflateqpl_codec(start_cluster):
node7.query(
"""
CREATE TABLE compression_codec_multiple_with_key (
somedate Date CODEC(ZSTD, ZSTD, ZSTD(12), LZ4HC(12), DEFLATE_QPL),
id UInt64 CODEC(LZ4, ZSTD, NONE, LZ4HC, DEFLATE_QPL),
data String CODEC(ZSTD(2), LZ4HC, NONE, LZ4, LZ4, DEFLATE_QPL),
somecolumn Float64
) ENGINE = MergeTree() PARTITION BY somedate ORDER BY id SETTINGS index_granularity = 2;
"""
)
node7.query(
"INSERT INTO compression_codec_multiple_with_key VALUES(toDate('2018-10-12'), 100000, 'hello', 88.88), (toDate('2018-10-12'), 100002, 'world', 99.99), (toDate('2018-10-12'), 1111, '!', 777.777)"
)
assert (
node7.query(
"SELECT COUNT(*) FROM compression_codec_multiple_with_key WHERE id % 2 == 0"
)
== "2\n"
)
assert (
node7.query(
"SELECT DISTINCT somecolumn FROM compression_codec_multiple_with_key ORDER BY id"
)
== "777.777\n88.88\n99.99\n"
)
assert (
node7.query(
"SELECT data FROM compression_codec_multiple_with_key WHERE id >= 1112 AND somedate = toDate('2018-10-12') AND somecolumn <= 100"
)
== "hello\nworld\n"
)
node7.query(
"INSERT INTO compression_codec_multiple_with_key SELECT toDate('2018-10-12'), number, toString(number), 1.0 FROM system.numbers LIMIT 10000"
)
assert (
node7.query(
"SELECT COUNT(id) FROM compression_codec_multiple_with_key WHERE id % 10 == 0"
)
== "1001\n"
)
assert (
node7.query(
"SELECT SUM(somecolumn) FROM compression_codec_multiple_with_key"
)
== str(777.777 + 88.88 + 99.99 + 1.0 * 10000) + "\n"
)
assert (
node7.query(
"SELECT count(*) FROM compression_codec_multiple_with_key GROUP BY somedate"
)
== "10003\n"
)

View File

@ -12,13 +12,7 @@ CODEC(NONE)
2018-01-01 4 4
2018-01-01 5 5
2018-01-01 6 6
2018-01-01 1 default_value
2018-01-01 2 default_value
2018-01-01 3 3
2018-01-01 4 4
2018-01-01 5 5
2018-01-01 6 6
CODEC(ZSTD(1), LZ4HC(0), LZ4, LZ4, NONE)
CODEC(DEFLATE_QPL)
2018-01-01 1 default_value
2018-01-01 2 default_value
2018-01-01 3 3
@ -27,7 +21,26 @@ CODEC(ZSTD(1), LZ4HC(0), LZ4, LZ4, NONE)
2018-01-01 6 6
2018-01-01 7 7
2018-01-01 8 8
CODEC(ZSTD(1), LZ4HC(0), LZ4, LZ4, NONE)
CODEC(NONE, LZ4, LZ4HC(0), ZSTD(1))
2018-01-01 1 default_value
2018-01-01 2 default_value
2018-01-01 3 3
2018-01-01 4 4
2018-01-01 5 5
2018-01-01 6 6
2018-01-01 7 7
2018-01-01 8 8
CODEC(ZSTD(1), LZ4HC(0), LZ4, LZ4, DEFLATE_QPL, NONE)
2018-01-01 1 default_value
2018-01-01 2 default_value
2018-01-01 3 3
2018-01-01 4 4
2018-01-01 5 5
2018-01-01 6 6
2018-01-01 7 7
2018-01-01 8 8
2018-01-01 9 9
2018-01-01 10 10
CODEC(ZSTD(1), LZ4HC(0), LZ4, LZ4, DEFLATE_QPL, NONE)
CODEC(NONE, LZ4, LZ4HC(0), ZSTD(1), DEFLATE_QPL)
2
1

View File

@ -25,15 +25,23 @@ INSERT INTO alter_compression_codec VALUES('2018-01-01', 5, '5');
INSERT INTO alter_compression_codec VALUES('2018-01-01', 6, '6');
SELECT * FROM alter_compression_codec ORDER BY id;
OPTIMIZE TABLE alter_compression_codec FINAL;
SELECT * FROM alter_compression_codec ORDER BY id;
SET allow_suspicious_codecs = 1;
ALTER TABLE alter_compression_codec MODIFY COLUMN alter_column CODEC(ZSTD, LZ4HC, LZ4, LZ4, NONE);
SET enable_qpl_deflate = 1;
ALTER TABLE alter_compression_codec MODIFY COLUMN alter_column CODEC(DEFLATE_QPL);
SELECT compression_codec FROM system.columns WHERE database = currentDatabase() AND table = 'alter_compression_codec' AND name = 'alter_column';
INSERT INTO alter_compression_codec VALUES('2018-01-01', 7, '7');
INSERT INTO alter_compression_codec VALUES('2018-01-01', 8, '8');
SELECT * FROM alter_compression_codec ORDER BY id;
OPTIMIZE TABLE alter_compression_codec FINAL;
SELECT * FROM alter_compression_codec ORDER BY id;
SET allow_suspicious_codecs = 1;
ALTER TABLE alter_compression_codec MODIFY COLUMN alter_column CODEC(ZSTD, LZ4HC, LZ4, LZ4, DEFLATE_QPL, NONE);
SELECT compression_codec FROM system.columns WHERE database = currentDatabase() AND table = 'alter_compression_codec' AND name = 'alter_column';
INSERT INTO alter_compression_codec VALUES('2018-01-01', 9, '9');
INSERT INTO alter_compression_codec VALUES('2018-01-01', 10, '10');
OPTIMIZE TABLE alter_compression_codec FINAL;
SELECT * FROM alter_compression_codec ORDER BY id;
@ -54,15 +62,17 @@ ALTER TABLE alter_bad_codec ADD COLUMN alter_column DateTime DEFAULT '2019-01-01
ALTER TABLE alter_bad_codec ADD COLUMN alter_column DateTime DEFAULT '2019-01-01 00:00:00' CODEC(ZSTD(100)); -- { serverError 433 }
ALTER TABLE alter_bad_codec ADD COLUMN alter_column DateTime DEFAULT '2019-01-01 00:00:00' CODEC(DEFLATE_QPL(100)); -- { serverError 378 }
DROP TABLE IF EXISTS alter_bad_codec;
DROP TABLE IF EXISTS large_alter_table_00804;
DROP TABLE IF EXISTS store_of_hash_00804;
CREATE TABLE large_alter_table_00804 (
somedate Date CODEC(ZSTD, ZSTD, ZSTD(12), LZ4HC(12)),
id UInt64 CODEC(LZ4, ZSTD, NONE, LZ4HC),
data String CODEC(ZSTD(2), LZ4HC, NONE, LZ4, LZ4)
somedate Date CODEC(ZSTD, ZSTD, ZSTD(12), LZ4HC(12), DEFLATE_QPL),
id UInt64 CODEC(LZ4, ZSTD, NONE, LZ4HC, DEFLATE_QPL),
data String CODEC(ZSTD(2), LZ4HC, NONE, LZ4, LZ4, DEFLATE_QPL)
) ENGINE = MergeTree() PARTITION BY somedate ORDER BY id SETTINGS index_granularity = 2, index_granularity_bytes = '10Mi', min_bytes_for_wide_part = 0;
INSERT INTO large_alter_table_00804 SELECT toDate('2019-01-01'), number, toString(number + rand()) FROM system.numbers LIMIT 300000;
@ -71,7 +81,7 @@ CREATE TABLE store_of_hash_00804 (hash UInt64) ENGINE = Memory();
INSERT INTO store_of_hash_00804 SELECT sum(cityHash64(*)) FROM large_alter_table_00804;
ALTER TABLE large_alter_table_00804 MODIFY COLUMN data CODEC(NONE, LZ4, LZ4HC, ZSTD);
ALTER TABLE large_alter_table_00804 MODIFY COLUMN data CODEC(NONE, LZ4, LZ4HC, ZSTD, DEFLATE_QPL);
OPTIMIZE TABLE large_alter_table_00804;