Merge pull request #11162 from azat/data-skip-index-merging-params-fix

[RFC] Fix data skipping indexes for columns with additional actions during merge
This commit is contained in:
alexey-milovidov 2020-05-31 19:10:53 +03:00 committed by GitHub
commit 8accde79b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 71 additions and 5 deletions

View File

@ -423,6 +423,7 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
} }
ASTPtr skip_indices_with_primary_key_expr_list = new_primary_key_expr_list->clone(); ASTPtr skip_indices_with_primary_key_expr_list = new_primary_key_expr_list->clone();
ASTPtr skip_indices_expr_list = new_primary_key_expr_list->clone();
ASTPtr skip_indices_with_sorting_key_expr_list = new_sorting_key_expr_list->clone(); ASTPtr skip_indices_with_sorting_key_expr_list = new_sorting_key_expr_list->clone();
MergeTreeIndices new_indices; MergeTreeIndices new_indices;
@ -452,6 +453,7 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
{ {
skip_indices_with_primary_key_expr_list->children.push_back(expr->clone()); skip_indices_with_primary_key_expr_list->children.push_back(expr->clone());
skip_indices_with_sorting_key_expr_list->children.push_back(expr->clone()); skip_indices_with_sorting_key_expr_list->children.push_back(expr->clone());
skip_indices_expr_list->children.push_back(expr->clone());
} }
indices_names.insert(new_indices.back()->name); indices_names.insert(new_indices.back()->name);
@ -462,6 +464,11 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
auto new_indices_with_primary_key_expr = ExpressionAnalyzer( auto new_indices_with_primary_key_expr = ExpressionAnalyzer(
skip_indices_with_primary_key_expr_list, syntax_primary, global_context).getActions(false); skip_indices_with_primary_key_expr_list, syntax_primary, global_context).getActions(false);
auto syntax_indices = SyntaxAnalyzer(global_context).analyze(
skip_indices_with_primary_key_expr_list, all_columns);
auto new_indices_expr = ExpressionAnalyzer(
skip_indices_expr_list, syntax_indices, global_context).getActions(false);
auto syntax_sorting = SyntaxAnalyzer(global_context).analyze( auto syntax_sorting = SyntaxAnalyzer(global_context).analyze(
skip_indices_with_sorting_key_expr_list, all_columns); skip_indices_with_sorting_key_expr_list, all_columns);
auto new_indices_with_sorting_key_expr = ExpressionAnalyzer( auto new_indices_with_sorting_key_expr = ExpressionAnalyzer(
@ -494,6 +501,7 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
setConstraints(metadata.constraints); setConstraints(metadata.constraints);
skip_indices_expr = new_indices_expr;
primary_key_and_skip_indices_expr = new_indices_with_primary_key_expr; primary_key_and_skip_indices_expr = new_indices_with_primary_key_expr;
sorting_key_and_skip_indices_expr = new_indices_with_sorting_key_expr; sorting_key_and_skip_indices_expr = new_indices_with_sorting_key_expr;
} }

View File

@ -647,6 +647,7 @@ public:
/// Secondary (data skipping) indices for MergeTree /// Secondary (data skipping) indices for MergeTree
MergeTreeIndices skip_indices; MergeTreeIndices skip_indices;
ExpressionActionsPtr skip_indices_expr;
ExpressionActionsPtr primary_key_and_skip_indices_expr; ExpressionActionsPtr primary_key_and_skip_indices_expr;
ExpressionActionsPtr sorting_key_and_skip_indices_expr; ExpressionActionsPtr sorting_key_and_skip_indices_expr;

View File

@ -715,13 +715,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
Pipe pipe(std::move(input)); Pipe pipe(std::move(input));
if (data.hasPrimaryKey() || data.hasSkipIndices()) if (data.hasSortingKey())
{ {
auto expr = std::make_shared<ExpressionTransform>(pipe.getHeader(), data.sorting_key_and_skip_indices_expr); auto expr = std::make_shared<ExpressionTransform>(pipe.getHeader(), data.sorting_key_expr);
pipe.addSimpleTransform(std::move(expr)); pipe.addSimpleTransform(std::move(expr));
auto materializing = std::make_shared<MaterializingTransform>(pipe.getHeader());
pipe.addSimpleTransform(std::move(materializing));
} }
pipes.emplace_back(std::move(pipe)); pipes.emplace_back(std::move(pipe));
@ -796,6 +793,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
if (need_remove_expired_values) if (need_remove_expired_values)
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, new_data_part, time_of_merge, force_ttl); merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, new_data_part, time_of_merge, force_ttl);
if (data.hasSkipIndices())
{
merged_stream = std::make_shared<ExpressionBlockInputStream>(merged_stream, data.skip_indices_expr);
merged_stream = std::make_shared<MaterializingBlockInputStream>(merged_stream);
}
MergedBlockOutputStream to{ MergedBlockOutputStream to{
new_data_part, new_data_part,
merging_columns, merging_columns,

View File

@ -0,0 +1,20 @@
INSERT
1 0
1 1
1 1
INSERT
1 0
1 1
1 0
1 1
1 2
1 3
1 1
1 1
1 3
OPTIMIZE
1 3
1 3
OPTIMIZE
1 3
1 3

View File

@ -0,0 +1,34 @@
DROP TABLE IF EXISTS data_01285;
SET max_threads=1;
CREATE TABLE data_01285 (
key Int,
value SimpleAggregateFunction(max, Nullable(Int)),
INDEX value_idx assumeNotNull(value) TYPE minmax GRANULARITY 1
)
ENGINE=AggregatingMergeTree()
ORDER BY key;
SELECT 'INSERT';
INSERT INTO data_01285 SELECT 1, number FROM numbers(2);
SELECT * FROM data_01285;
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 1;
SELECT 'INSERT';
INSERT INTO data_01285 SELECT 1, number FROM numbers(4);
SELECT * FROM data_01285;
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 1;
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 3;
SELECT 'OPTIMIZE';
OPTIMIZE TABLE data_01285 FINAL;
SELECT * FROM data_01285;
-- before the fix value_idx contains one range {0, 0}
-- and hence cannot find these record.
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 3;
-- one more time just in case
SELECT 'OPTIMIZE';
OPTIMIZE TABLE data_01285 FINAL;
SELECT * FROM data_01285;
-- and this passes even without fix.
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 3;