From c977c33d6d60f042d3c1b7452cc982be17b01d14 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 27 Jun 2021 19:18:15 +0300 Subject: [PATCH 1/4] Fix bug in execution of TTL GROUP BY --- src/DataStreams/TTLAggregationAlgorithm.cpp | 154 ++++++++++-------- src/DataStreams/TTLColumnAlgorithm.cpp | 2 + src/DataStreams/TTLDeleteAlgorithm.cpp | 2 + .../MergeTree/MergeTreeDataPartTTLInfo.cpp | 51 ++++++ .../MergeTree/MergeTreeDataPartTTLInfo.h | 9 + src/Storages/MergeTree/TTLMergeSelector.cpp | 3 + src/Storages/StorageReplicatedMergeTree.cpp | 4 + tests/integration/test_ttl_replicated/test.py | 13 +- .../test.py | 5 +- 9 files changed, 174 insertions(+), 69 deletions(-) diff --git a/src/DataStreams/TTLAggregationAlgorithm.cpp b/src/DataStreams/TTLAggregationAlgorithm.cpp index 9a1cf45772f..6d5c234a074 100644 --- a/src/DataStreams/TTLAggregationAlgorithm.cpp +++ b/src/DataStreams/TTLAggregationAlgorithm.cpp @@ -36,88 +36,110 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm( storage_.getContext()->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data); aggregator = std::make_unique(params); + + if (isMinTTLExpired()) + new_ttl_info.finished = true; } void TTLAggregationAlgorithm::execute(Block & block) { - if (!block) - { - if (!aggregation_result.empty()) - { - MutableColumns result_columns = header.cloneEmptyColumns(); - finalizeAggregates(result_columns); - block = header.cloneWithColumns(std::move(result_columns)); - } - return; - } - - const auto & column_names = header.getNames(); + bool some_rows_were_aggregated = false; MutableColumns result_columns = header.cloneEmptyColumns(); - MutableColumns aggregate_columns = header.cloneEmptyColumns(); - auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column); - auto where_column = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column); - - size_t rows_aggregated = 0; - size_t current_key_start = 0; - size_t rows_with_current_key = 0; - - for (size_t i = 0; i < block.rows(); ++i) + if (!block) /// Empty block -- no more data, but we may still have some accumulated rows { - UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i); - bool where_filter_passed = !where_column || where_column->getBool(i); - bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed; - - bool same_as_current = true; - for (size_t j = 0; j < description.group_by_keys.size(); ++j) + if (!aggregation_result.empty()) /// Still have some aggregated data, let's update TTL { - const String & key_column = description.group_by_keys[j]; - const IColumn * values_column = block.getByName(key_column).column.get(); - if (!same_as_current || (*values_column)[i] != current_key_value[j]) - { - values_column->get(i, current_key_value[j]); - same_as_current = false; - } - } - - if (!same_as_current) - { - if (rows_with_current_key) - calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key); finalizeAggregates(result_columns); - - current_key_start = rows_aggregated; - rows_with_current_key = 0; + some_rows_were_aggregated = true; } - - if (ttl_expired) + else /// No block, all aggregated, just finish { - ++rows_with_current_key; - ++rows_aggregated; - for (const auto & name : column_names) - { - const IColumn * values_column = block.getByName(name).column.get(); - auto & column = aggregate_columns[header.getPositionByName(name)]; - column->insertFrom(*values_column, i); - } - } - else - { - new_ttl_info.update(cur_ttl); - for (const auto & name : column_names) - { - const IColumn * values_column = block.getByName(name).column.get(); - auto & column = result_columns[header.getPositionByName(name)]; - column->insertFrom(*values_column, i); - } + return; } } + else + { + const auto & column_names = header.getNames(); + MutableColumns aggregate_columns = header.cloneEmptyColumns(); - if (rows_with_current_key) - calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key); + auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column); + auto where_column = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column); + + size_t rows_aggregated = 0; + size_t current_key_start = 0; + size_t rows_with_current_key = 0; + + for (size_t i = 0; i < block.rows(); ++i) + { + UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i); + bool where_filter_passed = !where_column || where_column->getBool(i); + bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed; + + bool same_as_current = true; + for (size_t j = 0; j < description.group_by_keys.size(); ++j) + { + const String & key_column = description.group_by_keys[j]; + const IColumn * values_column = block.getByName(key_column).column.get(); + if (!same_as_current || (*values_column)[i] != current_key_value[j]) + { + values_column->get(i, current_key_value[j]); + same_as_current = false; + } + } + + if (!same_as_current) + { + if (rows_with_current_key) + { + some_rows_were_aggregated = true; + calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key); + } + finalizeAggregates(result_columns); + + current_key_start = rows_aggregated; + rows_with_current_key = 0; + } + + if (ttl_expired) + { + ++rows_with_current_key; + ++rows_aggregated; + for (const auto & name : column_names) + { + const IColumn * values_column = block.getByName(name).column.get(); + auto & column = aggregate_columns[header.getPositionByName(name)]; + column->insertFrom(*values_column, i); + } + } + else + { + for (const auto & name : column_names) + { + const IColumn * values_column = block.getByName(name).column.get(); + auto & column = result_columns[header.getPositionByName(name)]; + column->insertFrom(*values_column, i); + } + } + } + + if (rows_with_current_key) + { + some_rows_were_aggregated = true; + calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key); + } + } block = header.cloneWithColumns(std::move(result_columns)); + + /// If some rows were aggregated we have to recalculate ttl info's + if (some_rows_were_aggregated) + { + auto ttl_column_after_aggregation = executeExpressionAndGetColumn(description.expression, block, description.result_column); + for (size_t i = 0; i < block.rows(); ++i) + new_ttl_info.update(getTimestampByIndex(ttl_column_after_aggregation.get(), i)); + } } void TTLAggregationAlgorithm::calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length) @@ -133,6 +155,7 @@ void TTLAggregationAlgorithm::calculateAggregates(const MutableColumns & aggrega aggregator->executeOnBlock(aggregate_chunk, length, aggregation_result, key_columns, columns_for_aggregator, no_more_keys); + } void TTLAggregationAlgorithm::finalizeAggregates(MutableColumns & result_columns) @@ -140,6 +163,7 @@ void TTLAggregationAlgorithm::finalizeAggregates(MutableColumns & result_columns if (!aggregation_result.empty()) { auto aggregated_res = aggregator->convertToBlocks(aggregation_result, true, 1); + for (auto & agg_block : aggregated_res) { for (const auto & it : description.set_parts) diff --git a/src/DataStreams/TTLColumnAlgorithm.cpp b/src/DataStreams/TTLColumnAlgorithm.cpp index 140631ac0bf..5c0a5e1ae83 100644 --- a/src/DataStreams/TTLColumnAlgorithm.cpp +++ b/src/DataStreams/TTLColumnAlgorithm.cpp @@ -21,6 +21,8 @@ TTLColumnAlgorithm::TTLColumnAlgorithm( new_ttl_info = old_ttl_info; is_fully_empty = false; } + else + new_ttl_info.finished = true; } void TTLColumnAlgorithm::execute(Block & block) diff --git a/src/DataStreams/TTLDeleteAlgorithm.cpp b/src/DataStreams/TTLDeleteAlgorithm.cpp index c364bb06f3e..f1bbe6d4b7d 100644 --- a/src/DataStreams/TTLDeleteAlgorithm.cpp +++ b/src/DataStreams/TTLDeleteAlgorithm.cpp @@ -9,6 +9,8 @@ TTLDeleteAlgorithm::TTLDeleteAlgorithm( { if (!isMinTTLExpired()) new_ttl_info = old_ttl_info; + else + new_ttl_info.finished = true; } void TTLDeleteAlgorithm::execute(Block & block) diff --git a/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp b/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp index e130fbc1798..f1beb09c482 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp @@ -55,6 +55,10 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in) MergeTreeDataPartTTLInfo ttl_info; ttl_info.min = col["min"].getUInt(); ttl_info.max = col["max"].getUInt(); + + if (col.has("finished")) + ttl_info.finished = col["finished"].getUInt(); + String name = col["name"].getString(); columns_ttl.emplace(name, ttl_info); @@ -67,6 +71,9 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in) table_ttl.min = table["min"].getUInt(); table_ttl.max = table["max"].getUInt(); + if (table.has("finished")) + table_ttl.finished = table["finished"].getUInt(); + updatePartMinMaxTTL(table_ttl.min, table_ttl.max); } @@ -77,6 +84,10 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in) MergeTreeDataPartTTLInfo ttl_info; ttl_info.min = elem["min"].getUInt(); ttl_info.max = elem["max"].getUInt(); + + if (elem.has("finished")) + ttl_info.finished = elem["finished"].getUInt(); + String expression = elem["expression"].getString(); ttl_info_map.emplace(expression, ttl_info); @@ -126,6 +137,8 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const writeIntText(it->second.min, out); writeString(",\"max\":", out); writeIntText(it->second.max, out); + writeString(R"(,"finished":)", out); + writeIntText(static_cast(it->second.finished), out); writeString("}", out); } writeString("]", out); @@ -138,6 +151,8 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const writeIntText(table_ttl.min, out); writeString(R"(,"max":)", out); writeIntText(table_ttl.max, out); + writeString(R"(,"finished":)", out); + writeIntText(static_cast(table_ttl.finished), out); writeString("}", out); } @@ -159,6 +174,8 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const writeIntText(it->second.min, out); writeString(R"(,"max":)", out); writeIntText(it->second.max, out); + writeString(R"(,"finished":)", out); + writeIntText(static_cast(it->second.finished), out); writeString("}", out); } writeString("]", out); @@ -202,6 +219,39 @@ time_t MergeTreeDataPartTTLInfos::getMinimalMaxRecompressionTTL() const return max; } +bool MergeTreeDataPartTTLInfos::hasAnyNonFinishedTTLs() const +{ + auto has_non_finished_ttl = [] (const TTLInfoMap & map) -> bool + { + for (const auto & [name, info] : map) + { + if (!info.finished) + return true; + } + return false; + }; + + if (!table_ttl.finished) + return true; + + if (has_non_finished_ttl(columns_ttl)) + return true; + + if (has_non_finished_ttl(rows_where_ttl)) + return true; + + if (has_non_finished_ttl(moves_ttl)) + return true; + + if (has_non_finished_ttl(recompression_ttl)) + return true; + + if (has_non_finished_ttl(group_by_ttl)) + return true; + + return false; +} + std::optional selectTTLDescriptionForTTLInfos(const TTLDescriptions & descriptions, const TTLInfoMap & ttl_info_map, time_t current_time, bool use_max) { time_t best_ttl_time = 0; @@ -232,4 +282,5 @@ std::optional selectTTLDescriptionForTTLInfos(const TTLDescripti return best_ttl_time ? *best_entry_it : std::optional(); } + } diff --git a/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.h b/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.h index 9d1606ee44a..2b79ad1aac5 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.h +++ b/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.h @@ -14,6 +14,11 @@ struct MergeTreeDataPartTTLInfo time_t min = 0; time_t max = 0; + /// This TTL was computed on completely expired part. It doesn't make sense + /// to select such parts for TTL again. But make sense to recalcuate TTL + /// again for merge with multiple parts. + bool finished = false; + void update(time_t time) { if (time && (!min || time < min)) @@ -28,6 +33,7 @@ struct MergeTreeDataPartTTLInfo min = other_info.min; max = std::max(other_info.max, max); + finished &= other_info.finished; } }; @@ -60,6 +66,9 @@ struct MergeTreeDataPartTTLInfos void write(WriteBuffer & out) const; void update(const MergeTreeDataPartTTLInfos & other_infos); + /// Has any TTLs which are not calculated on completely expired parts. + bool hasAnyNonFinishedTTLs() const; + void updatePartMinMaxTTL(time_t time_min, time_t time_max) { if (time_min && (!part_min_ttl || time_min < part_min_ttl)) diff --git a/src/Storages/MergeTree/TTLMergeSelector.cpp b/src/Storages/MergeTree/TTLMergeSelector.cpp index fc7aa93e129..ab686c9952d 100644 --- a/src/Storages/MergeTree/TTLMergeSelector.cpp +++ b/src/Storages/MergeTree/TTLMergeSelector.cpp @@ -111,6 +111,9 @@ bool TTLDeleteMergeSelector::isTTLAlreadySatisfied(const IMergeSelector::Part & if (only_drop_parts) return false; + if (!part.ttl_infos->hasAnyNonFinishedTTLs()) + return false; + return !part.shall_participate_in_merges; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index e91f3d9554e..ea4376a56ec 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -551,6 +551,10 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas( break; } + /// This replica inactive, don't check anything + if (!inactive_replicas.empty() && inactive_replicas.count(replica)) + break; + /// It maybe already removed from zk, but local in-memory mutations /// state was not updated. if (!getZooKeeper()->exists(fs::path(zookeeper_path) / "mutations" / mutation_id)) diff --git a/tests/integration/test_ttl_replicated/test.py b/tests/integration/test_ttl_replicated/test.py index de5e5984082..f37c28b2a80 100644 --- a/tests/integration/test_ttl_replicated/test.py +++ b/tests/integration/test_ttl_replicated/test.py @@ -351,6 +351,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run): ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_delete_{suff}', '{replica}') ORDER BY id PARTITION BY toDayOfMonth(date) TTL date + INTERVAL 3 SECOND + SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100 '''.format(suff=num_run, replica=node.name)) node.query( @@ -359,6 +360,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run): ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_group_by_{suff}', '{replica}') ORDER BY id PARTITION BY toDayOfMonth(date) TTL date + INTERVAL 3 SECOND GROUP BY id SET val = sum(val) + SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100 '''.format(suff=num_run, replica=node.name)) node.query( @@ -367,6 +369,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run): ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_where_{suff}', '{replica}') ORDER BY id PARTITION BY toDayOfMonth(date) TTL date + INTERVAL 3 SECOND DELETE WHERE id % 2 = 1 + SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100 '''.format(suff=num_run, replica=node.name)) node_left.query("INSERT INTO test_ttl_delete VALUES (now(), 1)") @@ -397,9 +400,9 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run): node_right.query("OPTIMIZE TABLE test_ttl_group_by FINAL") node_right.query("OPTIMIZE TABLE test_ttl_where FINAL") - exec_query_with_retry(node_left, "SYSTEM SYNC REPLICA test_ttl_delete") - node_left.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20) - node_left.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20) + exec_query_with_retry(node_left, "OPTIMIZE TABLE test_ttl_delete FINAL") + node_left.query("OPTIMIZE TABLE test_ttl_group_by FINAL", timeout=20) + node_left.query("OPTIMIZE TABLE test_ttl_where FINAL", timeout=20) # After OPTIMIZE TABLE, it is not guaranteed that everything is merged. # Possible scenario (for test_ttl_group_by): @@ -414,6 +417,10 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run): node_right.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20) node_right.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20) + exec_query_with_retry(node_left, "SYSTEM SYNC REPLICA test_ttl_delete") + node_left.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20) + node_left.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20) + assert node_left.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n" assert node_right.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n" diff --git a/tests/integration/test_version_update_after_mutation/test.py b/tests/integration/test_version_update_after_mutation/test.py index dd8e1bc7a9e..03387b0be67 100644 --- a/tests/integration/test_version_update_after_mutation/test.py +++ b/tests/integration/test_version_update_after_mutation/test.py @@ -79,7 +79,10 @@ def test_upgrade_while_mutation(start_cluster): node3.restart_with_latest_version(signal=9) - exec_query_with_retry(node3, "ALTER TABLE mt1 DELETE WHERE id > 100000", settings={"mutations_sync": "2"}) + # wait replica became active + exec_query_with_retry(node3, "SYSTEM RESTART REPLICA mt1") + + node3.query("ALTER TABLE mt1 DELETE WHERE id > 100000", settings={"mutations_sync": "2"}) # will delete nothing, but previous async mutation will finish with this query assert_eq_with_retry(node3, "SELECT COUNT() from mt1", "50000\n") From 1b56b0a02058054f8193002307af5835ed95320a Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 28 Jun 2021 11:10:38 +0300 Subject: [PATCH 2/4] Fix flaky test --- src/DataStreams/TTLAggregationAlgorithm.cpp | 7 ++++++- .../0_stateless/01280_ttl_where_group_by.reference | 8 ++++---- tests/queries/0_stateless/01280_ttl_where_group_by.sh | 4 ++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/DataStreams/TTLAggregationAlgorithm.cpp b/src/DataStreams/TTLAggregationAlgorithm.cpp index 6d5c234a074..12d28ff4aea 100644 --- a/src/DataStreams/TTLAggregationAlgorithm.cpp +++ b/src/DataStreams/TTLAggregationAlgorithm.cpp @@ -137,8 +137,13 @@ void TTLAggregationAlgorithm::execute(Block & block) if (some_rows_were_aggregated) { auto ttl_column_after_aggregation = executeExpressionAndGetColumn(description.expression, block, description.result_column); + auto where_column_after_aggregation = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column); for (size_t i = 0; i < block.rows(); ++i) - new_ttl_info.update(getTimestampByIndex(ttl_column_after_aggregation.get(), i)); + { + bool where_filter_passed = !where_column_after_aggregation || where_column_after_aggregation->getBool(i); + if (where_filter_passed) + new_ttl_info.update(getTimestampByIndex(ttl_column_after_aggregation.get(), i)); + } } } diff --git a/tests/queries/0_stateless/01280_ttl_where_group_by.reference b/tests/queries/0_stateless/01280_ttl_where_group_by.reference index 7fe00709dee..65e7e5b158f 100644 --- a/tests/queries/0_stateless/01280_ttl_where_group_by.reference +++ b/tests/queries/0_stateless/01280_ttl_where_group_by.reference @@ -16,11 +16,11 @@ ttl_01280_3 2 1 0 3 3 1 8 2 ttl_01280_4 -1 1 0 4 -10 2 13 9 +0 4 +13 9 ttl_01280_5 1 2 7 5 2 3 6 5 ttl_01280_6 -1 5 3 5 -2 10 3 5 +1 3 5 +2 3 5 diff --git a/tests/queries/0_stateless/01280_ttl_where_group_by.sh b/tests/queries/0_stateless/01280_ttl_where_group_by.sh index 9f30c7c5872..c9936ce7afd 100755 --- a/tests/queries/0_stateless/01280_ttl_where_group_by.sh +++ b/tests/queries/0_stateless/01280_ttl_where_group_by.sh @@ -80,7 +80,7 @@ insert into ttl_01280_4 values (1, 5, 4, 9, now())" sleep 2 optimize "ttl_01280_4" -$CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_4 ORDER BY a, b, x, y" +$CLICKHOUSE_CLIENT --query "select x, y from ttl_01280_4 ORDER BY a, b, x, y" $CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_5" @@ -107,7 +107,7 @@ insert into ttl_01280_6 values (1, 5, 3, 5, now())" sleep 2 optimize "ttl_01280_6" -$CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_6 ORDER BY a, b, x, y" +$CLICKHOUSE_CLIENT --query "select a, x, y from ttl_01280_6 ORDER BY a, b, x, y" $CLICKHOUSE_CLIENT -q "DROP TABLE ttl_01280_1" $CLICKHOUSE_CLIENT -q "DROP TABLE ttl_01280_2" From 72b281987e59028d215c0ee77ec0bef99072d30f Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 28 Jun 2021 17:14:26 +0300 Subject: [PATCH 3/4] Add more debug --- src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 766d988500d..a348b07ba92 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -752,13 +752,16 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor bool force_ttl = false; for (const auto & part : parts) { - new_data_part->ttl_infos.update(part->ttl_infos); if (metadata_snapshot->hasAnyTTL() && !part->checkAllTTLCalculated(metadata_snapshot)) { LOG_INFO(log, "Some TTL values were not calculated for part {}. Will calculate them forcefully during merge.", part->name); need_remove_expired_values = true; force_ttl = true; } + else + { + new_data_part->ttl_infos.update(part->ttl_infos); + } } const auto & part_min_ttl = new_data_part->ttl_infos.part_min_ttl; @@ -939,7 +942,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor merged_stream = std::make_shared(merged_stream, sort_description, SizeLimits(), 0 /*limit_hint*/, deduplicate_by_columns); if (need_remove_expired_values) + { + LOG_DEBUG(log, "Outdated rows found in source parts, TTLs processing enabled for merge"); merged_stream = std::make_shared(merged_stream, data, metadata_snapshot, new_data_part, time_of_merge, force_ttl); + } if (metadata_snapshot->hasSecondaryIndices()) { From 75e26b93d066b424eababe08d29eda9f0e95edec Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 6 Jul 2021 15:05:18 +0300 Subject: [PATCH 4/4] Review bug fixes --- src/DataStreams/TTLAggregationAlgorithm.cpp | 2 +- src/DataStreams/TTLColumnAlgorithm.cpp | 3 ++- src/DataStreams/TTLDeleteAlgorithm.cpp | 3 ++- src/Storages/MergeTree/TTLMergeSelector.cpp | 3 ++- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/DataStreams/TTLAggregationAlgorithm.cpp b/src/DataStreams/TTLAggregationAlgorithm.cpp index 12d28ff4aea..287ecb7dd6e 100644 --- a/src/DataStreams/TTLAggregationAlgorithm.cpp +++ b/src/DataStreams/TTLAggregationAlgorithm.cpp @@ -37,7 +37,7 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm( aggregator = std::make_unique(params); - if (isMinTTLExpired()) + if (isMaxTTLExpired()) new_ttl_info.finished = true; } diff --git a/src/DataStreams/TTLColumnAlgorithm.cpp b/src/DataStreams/TTLColumnAlgorithm.cpp index 5c0a5e1ae83..1318ea382db 100644 --- a/src/DataStreams/TTLColumnAlgorithm.cpp +++ b/src/DataStreams/TTLColumnAlgorithm.cpp @@ -21,7 +21,8 @@ TTLColumnAlgorithm::TTLColumnAlgorithm( new_ttl_info = old_ttl_info; is_fully_empty = false; } - else + + if (isMaxTTLExpired()) new_ttl_info.finished = true; } diff --git a/src/DataStreams/TTLDeleteAlgorithm.cpp b/src/DataStreams/TTLDeleteAlgorithm.cpp index f1bbe6d4b7d..ea7a0b235ec 100644 --- a/src/DataStreams/TTLDeleteAlgorithm.cpp +++ b/src/DataStreams/TTLDeleteAlgorithm.cpp @@ -9,7 +9,8 @@ TTLDeleteAlgorithm::TTLDeleteAlgorithm( { if (!isMinTTLExpired()) new_ttl_info = old_ttl_info; - else + + if (isMaxTTLExpired()) new_ttl_info.finished = true; } diff --git a/src/Storages/MergeTree/TTLMergeSelector.cpp b/src/Storages/MergeTree/TTLMergeSelector.cpp index ab686c9952d..6a42ce039ac 100644 --- a/src/Storages/MergeTree/TTLMergeSelector.cpp +++ b/src/Storages/MergeTree/TTLMergeSelector.cpp @@ -111,8 +111,9 @@ bool TTLDeleteMergeSelector::isTTLAlreadySatisfied(const IMergeSelector::Part & if (only_drop_parts) return false; + /// All TTL satisfied if (!part.ttl_infos->hasAnyNonFinishedTTLs()) - return false; + return true; return !part.shall_participate_in_merges; }