Merge pull request #64391 from CurtizJ/materialize-indexes-setting

Added settings to disable materialization of skip indexes and statistics on inserts
This commit is contained in:
Anton Popov 2024-06-01 15:02:17 +00:00 committed by GitHub
commit 3663da3cc3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 192 additions and 13 deletions

View File

@ -260,6 +260,8 @@ class IColumn;
M(Bool, force_primary_key, false, "Throw an exception if there is primary key in a table, and it is not used.", 0) \
M(Bool, use_skip_indexes, true, "Use data skipping indexes during query execution.", 0) \
M(Bool, use_skip_indexes_if_final, false, "If query has FINAL, then skipping data based on indexes may produce incorrect result, hence disabled by default.", 0) \
M(Bool, materialize_skip_indexes_on_insert, true, "If true skip indexes are calculated on inserts, otherwise skip indexes will be calculated only during merges", 0) \
M(Bool, materialize_statistics_on_insert, true, "If true statistics are calculated on inserts, otherwise statistics will be calculated only during merges", 0) \
M(String, ignore_data_skipping_indices, "", "Comma separated list of strings or literals with the name of the data skipping indices that should be excluded during query execution.", 0) \
\
M(String, force_data_skipping_indices, "", "Comma separated list of strings or literals with the name of the data skipping indices that should be used during query execution, otherwise an exception will be thrown.", 0) \

View File

@ -85,7 +85,9 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"24.6", {{"input_format_parquet_use_native_reader", false, false, "When reading Parquet files, to use native reader instead of arrow reader."},
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},
{"input_format_parquet_use_native_reader", false, false, "When reading Parquet files, to use native reader instead of arrow reader."},
{"hdfs_throw_on_zero_files_match", false, false, "Allow to throw an error when ListObjects request cannot match any files in HDFS engine instead of empty query result"},
{"azure_throw_on_zero_files_match", false, false, "Allow to throw an error when ListObjects request cannot match any files in AzureBlobStorage engine instead of empty query result"},
{"s3_validate_request_settings", true, true, "Allow to disable S3 request settings validation"},

View File

@ -176,6 +176,7 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk(
if (settings.rewrite_primary_key)
initPrimaryIndex();
initSkipIndices();
initStatistics();
}
@ -272,6 +273,9 @@ void MergeTreeDataPartWriterOnDisk::initStatistics()
void MergeTreeDataPartWriterOnDisk::initSkipIndices()
{
if (skip_indices.empty())
return;
ParserCodec codec_parser;
auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.marks_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
CompressionCodecPtr marks_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr);

View File

@ -464,7 +464,13 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
temp_part.temporary_directory_lock = data.getTemporaryPartDirectoryHolder(part_dir);
auto indices = MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices());
MergeTreeIndices indices;
if (context->getSettingsRef().materialize_skip_indexes_on_insert)
indices = MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices());
Statistics statistics;
if (context->getSettingsRef().materialize_statistics_on_insert)
statistics = MergeTreeStatisticsFactory::instance().getMany(metadata_snapshot->getColumns());
/// If we need to calculate some columns to sort.
if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices())
@ -596,7 +602,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
metadata_snapshot,
columns,
indices,
MergeTreeStatisticsFactory::instance().getMany(metadata_snapshot->getColumns()),
statistics,
compression_codec,
context->getCurrentTransaction() ? context->getCurrentTransaction()->tid : Tx::PrehistoricTID,
false,

View File

@ -261,9 +261,9 @@ void MergeTreeWhereOptimizer::analyzeImpl(Conditions & res, const RPNBuilderTree
cond.columns_size = getColumnsSize(cond.table_columns);
cond.viable =
!has_invalid_column &&
!has_invalid_column
/// Condition depend on some column. Constant expressions are not moved.
!cond.table_columns.empty()
&& !cond.table_columns.empty()
&& !cannotBeMoved(node, where_optimizer_context)
/// When use final, do not take into consideration the conditions with non-sorting keys. Because final select
/// need to use all sorting keys, it will cause correctness issues if we filter other columns before final merge.
@ -273,17 +273,15 @@ void MergeTreeWhereOptimizer::analyzeImpl(Conditions & res, const RPNBuilderTree
/// Do not move conditions involving all queried columns.
&& cond.table_columns.size() < queried_columns.size();
if (cond.viable)
cond.good = isConditionGood(node, table_columns);
if (where_optimizer_context.use_statistic)
{
cond.good = cond.viable;
cond.selectivity = estimator.estimateSelectivity(node);
if (node.getASTNode() != nullptr)
LOG_TEST(log, "Condition {} has selectivity {}", node.getASTNode()->dumpTree(), cond.selectivity);
LOG_TEST(log, "Condition {} has selectivity {}", node.getColumnName(), cond.selectivity);
}
else if (cond.viable)
{
cond.good = isConditionGood(node, table_columns);
}
if (where_optimizer_context.move_primary_key_columns_to_end_of_prewhere)
@ -363,6 +361,7 @@ std::optional<MergeTreeWhereOptimizer::OptimizeResult> MergeTreeWhereOptimizer::
/// Move condition and all other conditions depend on the same set of columns.
auto move_condition = [&](Conditions::iterator cond_it)
{
LOG_TRACE(log, "Condition {} moved to PREWHERE", cond_it->node.getColumnName());
prewhere_conditions.splice(prewhere_conditions.end(), where_conditions, cond_it);
total_size_of_moved_conditions += cond_it->columns_size;
total_number_of_moved_columns += cond_it->table_columns.size();
@ -371,9 +370,14 @@ std::optional<MergeTreeWhereOptimizer::OptimizeResult> MergeTreeWhereOptimizer::
for (auto jt = where_conditions.begin(); jt != where_conditions.end();)
{
if (jt->viable && jt->columns_size == cond_it->columns_size && jt->table_columns == cond_it->table_columns)
{
LOG_TRACE(log, "Condition {} moved to PREWHERE", jt->node.getColumnName());
prewhere_conditions.splice(prewhere_conditions.end(), where_conditions, jt++);
}
else
{
++jt;
}
}
};

View File

@ -112,7 +112,7 @@ Float64 ConditionEstimator::estimateSelectivity(const RPNBuilderTreeNode & node)
auto [op, val] = extractBinaryOp(node, col);
if (op == "equals")
{
if (val < - threshold || val > threshold)
if (val < -threshold || val > threshold)
return default_normal_cond_factor;
else
return default_good_cond_factor;

View File

@ -0,0 +1,52 @@
20
Expression ((Project names + Projection))
Aggregating
Expression (Before GROUP BY)
Expression
ReadFromMergeTree (default.t_skip_index_insert)
Indexes:
Skip
Name: idx_a
Description: minmax GRANULARITY 1
Parts: 2/2
Granules: 50/50
Skip
Name: idx_b
Description: set GRANULARITY 1
Parts: 2/2
Granules: 50/50
20
Expression ((Project names + Projection))
Aggregating
Expression (Before GROUP BY)
Expression
ReadFromMergeTree (default.t_skip_index_insert)
Indexes:
Skip
Name: idx_a
Description: minmax GRANULARITY 1
Parts: 1/1
Granules: 6/50
Skip
Name: idx_b
Description: set GRANULARITY 1
Parts: 1/1
Granules: 6/6
20
Expression ((Project names + Projection))
Aggregating
Expression (Before GROUP BY)
Expression
ReadFromMergeTree (default.t_skip_index_insert)
Indexes:
Skip
Name: idx_a
Description: minmax GRANULARITY 1
Parts: 1/2
Granules: 6/50
Skip
Name: idx_b
Description: set GRANULARITY 1
Parts: 1/1
Granules: 6/6
4 0

View File

@ -0,0 +1,50 @@
DROP TABLE IF EXISTS t_skip_index_insert;
CREATE TABLE t_skip_index_insert
(
a UInt64,
b UInt64,
INDEX idx_a a TYPE minmax,
INDEX idx_b b TYPE set(3)
)
ENGINE = MergeTree ORDER BY tuple() SETTINGS index_granularity = 4;
SET allow_experimental_analyzer = 1;
SET materialize_skip_indexes_on_insert = 0;
SYSTEM STOP MERGES t_skip_index_insert;
INSERT INTO t_skip_index_insert SELECT number, number / 50 FROM numbers(100);
INSERT INTO t_skip_index_insert SELECT number, number / 50 FROM numbers(100, 100);
SELECT count() FROM t_skip_index_insert WHERE a >= 110 AND a < 130 AND b = 2;
EXPLAIN indexes = 1 SELECT count() FROM t_skip_index_insert WHERE a >= 110 AND a < 130 AND b = 2;
SYSTEM START MERGES t_skip_index_insert;
OPTIMIZE TABLE t_skip_index_insert FINAL;
SELECT count() FROM t_skip_index_insert WHERE a >= 110 AND a < 130 AND b = 2;
EXPLAIN indexes = 1 SELECT count() FROM t_skip_index_insert WHERE a >= 110 AND a < 130 AND b = 2;
TRUNCATE TABLE t_skip_index_insert;
INSERT INTO t_skip_index_insert SELECT number, number / 50 FROM numbers(100);
INSERT INTO t_skip_index_insert SELECT number, number / 50 FROM numbers(100, 100);
SET mutations_sync = 2;
ALTER TABLE t_skip_index_insert MATERIALIZE INDEX idx_a;
ALTER TABLE t_skip_index_insert MATERIALIZE INDEX idx_b;
SELECT count() FROM t_skip_index_insert WHERE a >= 110 AND a < 130 AND b = 2;
EXPLAIN indexes = 1 SELECT count() FROM t_skip_index_insert WHERE a >= 110 AND a < 130 AND b = 2;
DROP TABLE IF EXISTS t_skip_index_insert;
SYSTEM FLUSH LOGS;
SELECT count(), sum(ProfileEvents['MergeTreeDataWriterSkipIndicesCalculationMicroseconds'])
FROM system.query_log
WHERE current_database = currentDatabase()
AND query LIKE 'INSERT INTO t_skip_index_insert SELECT%'
AND type = 'QueryFinish';

View File

@ -0,0 +1,10 @@
10
10
10
statistic not used Condition less(b, 10_UInt8) moved to PREWHERE
statistic not used Condition less(a, 10_UInt8) moved to PREWHERE
statistic used after merge Condition less(a, 10_UInt8) moved to PREWHERE
statistic used after merge Condition less(b, 10_UInt8) moved to PREWHERE
statistic used after materialize Condition less(a, 10_UInt8) moved to PREWHERE
statistic used after materialize Condition less(b, 10_UInt8) moved to PREWHERE
2 0

View File

@ -0,0 +1,49 @@
DROP TABLE IF EXISTS t_statistic_materialize;
SET allow_experimental_analyzer = 1;
SET allow_experimental_statistic = 1;
SET allow_statistic_optimize = 1;
SET materialize_statistics_on_insert = 0;
CREATE TABLE t_statistic_materialize
(
a Int64 STATISTIC(tdigest),
b Int16 STATISTIC(tdigest),
) ENGINE = MergeTree() ORDER BY tuple()
SETTINGS min_bytes_for_wide_part = 0, enable_vertical_merge_algorithm = 0; -- TODO: there is a bug in vertical merge with statistics.
INSERT INTO t_statistic_materialize SELECT number, -number FROM system.numbers LIMIT 10000;
SELECT count(*) FROM t_statistic_materialize WHERE b < 10 and a < 10 SETTINGS log_comment = 'statistic not used';
OPTIMIZE TABLE t_statistic_materialize FINAL;
SELECT count(*) FROM t_statistic_materialize WHERE b < 10 and a < 10 SETTINGS log_comment = 'statistic used after merge';
TRUNCATE TABLE t_statistic_materialize;
SET mutations_sync = 2;
INSERT INTO t_statistic_materialize SELECT number, -number FROM system.numbers LIMIT 10000;
ALTER TABLE t_statistic_materialize MATERIALIZE STATISTIC a, b TYPE tdigest;
SELECT count(*) FROM t_statistic_materialize WHERE b < 10 and a < 10 SETTINGS log_comment = 'statistic used after materialize';
DROP TABLE t_statistic_materialize;
SYSTEM FLUSH LOGS;
SELECT log_comment, message FROM system.text_log JOIN
(
SELECT Settings['log_comment'] AS log_comment, query_id FROM system.query_log
WHERE current_database = currentDatabase()
AND query LIKE 'SELECT count(*) FROM t_statistic_materialize%'
AND type = 'QueryFinish'
) AS query_log USING (query_id)
WHERE message LIKE '%moved to PREWHERE%'
ORDER BY event_time_microseconds;
SELECT count(), sum(ProfileEvents['MergeTreeDataWriterStatisticsCalculationMicroseconds'])
FROM system.query_log
WHERE current_database = currentDatabase()
AND query LIKE 'INSERT INTO t_statistic_materialize SELECT%'
AND type = 'QueryFinish';