From ffe9eab1d0da8f2f718fcd553e5baec59f9438be Mon Sep 17 00:00:00 2001 From: terrylin Date: Tue, 3 Aug 2021 15:37:25 +0800 Subject: [PATCH] update test cases and fix bug --- src/DataStreams/TTLBlockInputStream.cpp | 4 +- src/DataStreams/TTLCalcInputStream.cpp | 12 +++--- src/DataStreams/TTLUpdateInfoAlgorithm.cpp | 39 ++++++++++++------- src/DataStreams/TTLUpdateInfoAlgorithm.h | 23 ++++++++++- tests/integration/test_ttl_replicated/test.py | 14 +++---- .../01560_ttl_remove_empty_parts.sql | 3 ++ 6 files changed, 66 insertions(+), 29 deletions(-) diff --git a/src/DataStreams/TTLBlockInputStream.cpp b/src/DataStreams/TTLBlockInputStream.cpp index 2cf7c121868..fc557bccad1 100644 --- a/src/DataStreams/TTLBlockInputStream.cpp +++ b/src/DataStreams/TTLBlockInputStream.cpp @@ -82,11 +82,11 @@ TTLBlockInputStream::TTLBlockInputStream( for (const auto & move_ttl : metadata_snapshot_->getMoveTTLs()) algorithms.emplace_back(std::make_unique( - move_ttl, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_)); + move_ttl, TTLUpdateType::MOVES_TTL, move_ttl.result_column, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_)); for (const auto & recompression_ttl : metadata_snapshot_->getRecompressionTTLs()) algorithms.emplace_back(std::make_unique( - recompression_ttl, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_)); + recompression_ttl, TTLUpdateType::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_)); } Block reorderColumns(Block block, const Block & header) diff --git a/src/DataStreams/TTLCalcInputStream.cpp b/src/DataStreams/TTLCalcInputStream.cpp index e34e85d4a67..82b17433b77 100644 --- a/src/DataStreams/TTLCalcInputStream.cpp +++ b/src/DataStreams/TTLCalcInputStream.cpp @@ -22,33 +22,33 @@ TTLCalcInputStream::TTLCalcInputStream( { const auto & rows_ttl = metadata_snapshot_->getRowsTTL(); algorithms.emplace_back(std::make_unique( - rows_ttl, old_ttl_infos.table_ttl, current_time_, force_)); + rows_ttl, TTLUpdateType::TABLE_TTL, rows_ttl.result_column, old_ttl_infos.table_ttl, current_time_, force_)); } for (const auto & where_ttl : metadata_snapshot_->getRowsWhereTTLs()) algorithms.emplace_back(std::make_unique( - where_ttl, old_ttl_infos.rows_where_ttl[where_ttl.result_column], current_time_, force_)); + where_ttl, TTLUpdateType::ROWS_WHERE_TTL, where_ttl.result_column, old_ttl_infos.rows_where_ttl[where_ttl.result_column], current_time_, force_)); for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs()) algorithms.emplace_back(std::make_unique( - group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_)); + group_by_ttl, TTLUpdateType::GROUP_BY_TTL, group_by_ttl.result_column, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_)); if (metadata_snapshot_->hasAnyColumnTTL()) { for (const auto & [name, description] : metadata_snapshot_->getColumnTTLs()) { algorithms.emplace_back(std::make_unique( - description, old_ttl_infos.columns_ttl[name], current_time_, force_)); + description, TTLUpdateType::COLUMNS_TTL, name, old_ttl_infos.columns_ttl[name], current_time_, force_)); } } for (const auto & move_ttl : metadata_snapshot_->getMoveTTLs()) algorithms.emplace_back(std::make_unique( - move_ttl, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_)); + move_ttl, TTLUpdateType::MOVES_TTL, move_ttl.result_column, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_)); for (const auto & recompression_ttl : metadata_snapshot_->getRecompressionTTLs()) algorithms.emplace_back(std::make_unique( - recompression_ttl, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_)); + recompression_ttl, TTLUpdateType::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_)); } Block TTLCalcInputStream::readImpl() diff --git a/src/DataStreams/TTLUpdateInfoAlgorithm.cpp b/src/DataStreams/TTLUpdateInfoAlgorithm.cpp index 49006be7c59..21e36f1361c 100644 --- a/src/DataStreams/TTLUpdateInfoAlgorithm.cpp +++ b/src/DataStreams/TTLUpdateInfoAlgorithm.cpp @@ -4,8 +4,15 @@ namespace DB { TTLUpdateInfoAlgorithm::TTLUpdateInfoAlgorithm( - const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_) + const TTLDescription & description_, + const TTLUpdateType ttl_update_type_, + const String ttl_update_key_, + const TTLInfo & old_ttl_info_, + time_t current_time_, + bool force_) : ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_) + , ttl_update_type(ttl_update_type_) + , ttl_update_key(ttl_update_key_) { } @@ -24,26 +31,32 @@ void TTLUpdateInfoAlgorithm::execute(Block & block) void TTLUpdateInfoAlgorithm::finalize(const MutableDataPartPtr & data_part) const { - if (description.mode == TTLMode::RECOMPRESS) + if (ttl_update_type == TTLUpdateType::RECOMPRESSION_TTL) { - data_part->ttl_infos.recompression_ttl[description.result_column] = new_ttl_info; + data_part->ttl_infos.recompression_ttl[ttl_update_key] = new_ttl_info; } - else if (description.mode == TTLMode::MOVE) + else if (ttl_update_type == TTLUpdateType::MOVES_TTL) { - data_part->ttl_infos.moves_ttl[description.result_column] = new_ttl_info; + data_part->ttl_infos.moves_ttl[ttl_update_key] = new_ttl_info; } - else if (description.mode == TTLMode::GROUP_BY) + else if (ttl_update_type == TTLUpdateType::GROUP_BY_TTL) { - data_part->ttl_infos.group_by_ttl[description.result_column] = new_ttl_info; + data_part->ttl_infos.group_by_ttl[ttl_update_key] = new_ttl_info; data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max); } - else if (description.mode == TTLMode::DELETE) + else if (ttl_update_type == TTLUpdateType::ROWS_WHERE_TTL) { - if (description.where_expression) - data_part->ttl_infos.rows_where_ttl[description.result_column] = new_ttl_info; - else - data_part->ttl_infos.table_ttl = new_ttl_info; - + data_part->ttl_infos.rows_where_ttl[ttl_update_key] = new_ttl_info; + data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max); + } + else if (ttl_update_type == TTLUpdateType::TABLE_TTL) + { + data_part->ttl_infos.table_ttl = new_ttl_info; + data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max); + } + else if (ttl_update_type == TTLUpdateType::COLUMNS_TTL) + { + data_part->ttl_infos.columns_ttl[ttl_update_key] = new_ttl_info; data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max); } diff --git a/src/DataStreams/TTLUpdateInfoAlgorithm.h b/src/DataStreams/TTLUpdateInfoAlgorithm.h index c0c4dcea755..5210b3c40c9 100644 --- a/src/DataStreams/TTLUpdateInfoAlgorithm.h +++ b/src/DataStreams/TTLUpdateInfoAlgorithm.h @@ -5,14 +5,35 @@ namespace DB { +enum class TTLUpdateType +{ + COLUMNS_TTL, + TABLE_TTL, + ROWS_WHERE_TTL, + MOVES_TTL, + RECOMPRESSION_TTL, + GROUP_BY_TTL, +}; + /// Calculates new ttl_info and does nothing with data. class TTLUpdateInfoAlgorithm : public ITTLAlgorithm { public: - TTLUpdateInfoAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_); + TTLUpdateInfoAlgorithm( + const TTLDescription & description_, + const TTLUpdateType ttl_update_type_, + const String ttl_update_key_, + const TTLInfo & old_ttl_info_, + time_t current_time_, bool force_ + ); void execute(Block & block) override; void finalize(const MutableDataPartPtr & data_part) const override; + +private: + const TTLUpdateType ttl_update_type; + const String ttl_update_key; }; + } diff --git a/tests/integration/test_ttl_replicated/test.py b/tests/integration/test_ttl_replicated/test.py index a5a7355f912..9a815aaca7e 100644 --- a/tests/integration/test_ttl_replicated/test.py +++ b/tests/integration/test_ttl_replicated/test.py @@ -169,15 +169,15 @@ def test_modify_ttl(started_cluster): node2.query("SYSTEM SYNC REPLICA test_ttl", timeout=20) node1.query("ALTER TABLE test_ttl MODIFY TTL d + INTERVAL 4 HOUR SETTINGS mutations_sync = 2") - time.sleep(5) # TTL merges shall happen. + time.sleep(6) # TTL merges shall happen. assert node2.query("SELECT id FROM test_ttl") == "2\n3\n" node2.query("ALTER TABLE test_ttl MODIFY TTL d + INTERVAL 2 HOUR SETTINGS mutations_sync = 2") - time.sleep(5) # TTL merges shall happen. + time.sleep(6) # TTL merges shall happen. assert node1.query("SELECT id FROM test_ttl") == "3\n" node1.query("ALTER TABLE test_ttl MODIFY TTL d + INTERVAL 30 MINUTE SETTINGS mutations_sync = 2") - time.sleep(5) # TTL merges shall happen. + time.sleep(6) # TTL merges shall happen. assert node2.query("SELECT id FROM test_ttl") == "" @@ -196,15 +196,15 @@ def test_modify_column_ttl(started_cluster): node2.query("SYSTEM SYNC REPLICA test_ttl", timeout=20) node1.query("ALTER TABLE test_ttl MODIFY COLUMN id UInt32 TTL d + INTERVAL 4 HOUR SETTINGS mutations_sync = 2") - time.sleep(5) # TTL merges shall happen. + time.sleep(6) # TTL merges shall happen. assert node2.query("SELECT id FROM test_ttl") == "42\n2\n3\n" node1.query("ALTER TABLE test_ttl MODIFY COLUMN id UInt32 TTL d + INTERVAL 2 HOUR SETTINGS mutations_sync = 2") - time.sleep(5) # TTL merges shall happen. + time.sleep(6) # TTL merges shall happen. assert node1.query("SELECT id FROM test_ttl") == "42\n42\n3\n" node1.query("ALTER TABLE test_ttl MODIFY COLUMN id UInt32 TTL d + INTERVAL 30 MINUTE SETTINGS mutations_sync = 2") - time.sleep(5) # TTL merges shall happen. + time.sleep(6) # TTL merges shall happen. assert node2.query("SELECT id FROM test_ttl") == "42\n42\n42\n" @@ -328,7 +328,7 @@ def test_ttl_empty_parts(started_cluster): node.query("SYSTEM START TTL MERGES test_ttl_empty_parts") optimize_with_retry(node1, 'test_ttl_empty_parts') - assert node1.query("SELECT name FROM system.parts WHERE table = 'test_ttl_empty_parts' AND active ORDER BY name") == "all_0_4_1_6\n" + assert node1.query("SELECT name FROM system.parts WHERE table = 'test_ttl_empty_parts' AND active ORDER BY name") == "all_0_5_1_6\n" assert node1.query("SELECT count() FROM test_ttl_empty_parts") == "3000\n" diff --git a/tests/queries/0_stateless/01560_ttl_remove_empty_parts.sql b/tests/queries/0_stateless/01560_ttl_remove_empty_parts.sql index f40ed70caef..83b2175a41e 100644 --- a/tests/queries/0_stateless/01560_ttl_remove_empty_parts.sql +++ b/tests/queries/0_stateless/01560_ttl_remove_empty_parts.sql @@ -10,6 +10,9 @@ SELECT count() FROM system.parts WHERE table = 'ttl_empty_parts' AND database = ALTER TABLE ttl_empty_parts MODIFY TTL d; +SELECT sleep(3) format Null; +SELECT sleep(3) format Null; + -- To be sure, that task, which clears outdated parts executed. DETACH TABLE ttl_empty_parts; ATTACH TABLE ttl_empty_parts;