diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index ad42fc90be5..210d412d86f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -633,6 +633,10 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new if (new_ttl_table_ast) { + std::vector update_move_ttl_entries; + ASTPtr update_ttl_table_ast = nullptr; + TTLEntry update_ttl_table_entry; + bool seen_delete_ttl = false; for (auto ttl_element_ptr : new_ttl_table_ast->children) { @@ -647,8 +651,8 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new auto new_ttl_table_entry = create_ttl_entry(ttl_element.children[0]); if (!only_check) { - ttl_table_ast = ttl_element.children[0]; - ttl_table_entry = new_ttl_table_entry; + update_ttl_table_ast = ttl_element.children[0]; + update_ttl_table_entry = new_ttl_table_entry; } seen_delete_ttl = true; @@ -671,11 +675,18 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new } if (!only_check) - { - move_ttl_entries.emplace_back(std::move(new_ttl_entry)); - } + update_move_ttl_entries.emplace_back(std::move(new_ttl_entry)); } } + + if (!only_check) + { + ttl_table_entry = update_ttl_table_entry; + ttl_table_ast = update_ttl_table_ast; + + auto move_ttl_entries_lock = std::lock_guard(move_ttl_entries_mutex); + move_ttl_entries = update_move_ttl_entries; + } } } @@ -3293,7 +3304,7 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_ ReservationPtr reservation; auto ttl_entry = selectTTLEntryForTTLInfos(ttl_infos, time_of_move); - if (ttl_entry != nullptr) + if (ttl_entry) { SpacePtr destination_ptr = ttl_entry->getDestination(storage_policy); if (!destination_ptr) @@ -3352,27 +3363,28 @@ bool MergeTreeData::TTLEntry::isPartInDestination(const StoragePolicyPtr & polic return false; } -const MergeTreeData::TTLEntry * MergeTreeData::selectTTLEntryForTTLInfos( +std::optional MergeTreeData::selectTTLEntryForTTLInfos( const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const { - const MergeTreeData::TTLEntry * result = nullptr; - /// Prefer TTL rule which went into action last. time_t max_max_ttl = 0; + std::vector::const_iterator best_entry_it; - for (const auto & ttl_entry : move_ttl_entries) + auto lock = std::lock_guard(move_ttl_entries_mutex); + for (auto ttl_entry_it = move_ttl_entries.begin(); ttl_entry_it != move_ttl_entries.end(); ++ttl_entry_it) { - auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry.result_column); + auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry_it->result_column); + /// Prefer TTL rule which went into action last. if (ttl_info_it != ttl_infos.moves_ttl.end() && ttl_info_it->second.max <= time_of_move && max_max_ttl <= ttl_info_it->second.max) { - result = &ttl_entry; + best_entry_it = ttl_entry_it; max_max_ttl = ttl_info_it->second.max; } } - return result; + return max_max_ttl ? *best_entry_it : std::optional(); } MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index ca2730a8aef..3c051829a61 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -737,12 +737,17 @@ public: bool isPartInDestination(const StoragePolicyPtr & policy, const MergeTreeDataPart & part) const; }; - const TTLEntry * selectTTLEntryForTTLInfos(const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const; + std::optional selectTTLEntryForTTLInfos(const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const; using TTLEntriesByName = std::unordered_map; TTLEntriesByName column_ttl_entries_by_name; TTLEntry ttl_table_entry; + + /// This mutex is required for background move operations which do not obtain global locks. + mutable std::mutex move_ttl_entries_mutex; + + /// Vector rw operations have to be done under "move_ttl_entries_mutex". std::vector move_ttl_entries; String sampling_expr_column_name; diff --git a/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp b/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp index d2324ff37f9..2085f7b4fec 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -127,14 +127,14 @@ bool MergeTreePartsMover::selectPartsForMove( if (!can_move(part, &reason)) continue; - const MergeTreeData::TTLEntry * ttl_entry_ptr = part->storage.selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move); + auto ttl_entry = part->storage.selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move); auto to_insert = need_to_move.find(part->disk); ReservationPtr reservation; - if (ttl_entry_ptr) + if (ttl_entry) { - auto destination = ttl_entry_ptr->getDestination(policy); - if (destination && !ttl_entry_ptr->isPartInDestination(policy, *part)) - reservation = part->storage.tryReserveSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy)); + auto destination = ttl_entry->getDestination(policy); + if (destination && !ttl_entry->isPartInDestination(policy, *part)) + reservation = part->storage.tryReserveSpace(part->bytes_on_disk, ttl_entry->getDestination(policy)); } if (reservation) /// Found reservation by TTL rule. diff --git a/dbms/tests/integration/test_ttl_move/test.py b/dbms/tests/integration/test_ttl_move/test.py index 974bbde5709..7fabdd85230 100644 --- a/dbms/tests/integration/test_ttl_move/test.py +++ b/dbms/tests/integration/test_ttl_move/test.py @@ -74,37 +74,37 @@ def test_rule_with_invalid_destination(started_cluster, name, engine, alter): {expression} SETTINGS storage_policy='{policy}' """.format(expression=x, name=name, engine=engine, policy=policy) - + if alter: node1.query(get_command(None, "small_jbod_with_external")) - + with pytest.raises(QueryRuntimeException): node1.query(get_command("TTL d1 TO DISK 'unknown'", "small_jbod_with_external")) - + node1.query("DROP TABLE IF EXISTS {}".format(name)) - + if alter: node1.query(get_command(None, "small_jbod_with_external")) - + with pytest.raises(QueryRuntimeException): node1.query(get_command("TTL d1 TO VOLUME 'unknown'", "small_jbod_with_external")) - + node1.query("DROP TABLE IF EXISTS {}".format(name)) - + if alter: node1.query(get_command(None, "only_jbod2")) - + with pytest.raises(QueryRuntimeException): node1.query(get_command("TTL d1 TO DISK 'jbod1'", "only_jbod2")) - + node1.query("DROP TABLE IF EXISTS {}".format(name)) - + if alter: node1.query(get_command(None, "only_jbod2")) - + with pytest.raises(QueryRuntimeException): node1.query(get_command("TTL d1 TO VOLUME 'external'", "only_jbod2")) - + finally: node1.query("DROP TABLE IF EXISTS {}".format(name)) @@ -501,13 +501,17 @@ def test_moves_after_merges_work(started_cluster, name, engine, positive): node1.query("DROP TABLE IF EXISTS {}".format(name)) -@pytest.mark.parametrize("name,engine,positive", [ - ("mt_test_moves_after_merges_do_not_work","MergeTree()",0), - ("replicated_mt_test_moves_after_merges_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_merges_do_not_work', '1')",0), - ("mt_test_moves_after_merges_work","MergeTree()",1), - ("replicated_mt_test_moves_after_merges_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_merges_work', '1')",1), +@pytest.mark.parametrize("name,engine,positive,bar", [ + ("mt_test_moves_after_alter_do_not_work","MergeTree()",0,"DELETE"), + ("replicated_mt_test_moves_after_alter_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_alter_do_not_work', '1')",0,"DELETE"), + ("mt_test_moves_after_alter_work","MergeTree()",1,"DELETE"), + ("replicated_mt_test_moves_after_alter_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_alter_work', '1')",1,"DELETE"), + ("mt_test_moves_after_alter_do_not_work","MergeTree()",0,"TO DISK 'external'"), + ("replicated_mt_test_moves_after_alter_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_alter_do_not_work', '1')",0,"TO DISK 'external'"), + ("mt_test_moves_after_alter_work","MergeTree()",1,"TO DISK 'external'"), + ("replicated_mt_test_moves_after_alter_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_alter_work', '1')",1,"TO DISK 'external'"), ]) -def test_ttls_do_not_work_after_alter(started_cluster, name, engine, positive): +def test_ttls_do_not_work_after_alter(started_cluster, name, engine, positive, bar): try: node1.query(""" CREATE TABLE {name} ( @@ -523,8 +527,8 @@ def test_ttls_do_not_work_after_alter(started_cluster, name, engine, positive): node1.query(""" ALTER TABLE {name} MODIFY TTL - d1 + INTERVAL 15 MINUTE - """.format(name=name)) # That shall disable TTL. + d1 + INTERVAL 15 MINUTE {bar} + """.format(name=name, bar=bar)) # That shall disable TTL. data = [] # 10MB in total for i in range(10): @@ -578,14 +582,14 @@ limitations under the License.""" ) ENGINE = {engine} ORDER BY tuple() PARTITION BY p1 - TTL d1 + INTERVAL 30 SECOND TO DISK 'jbod2', + TTL d1 + INTERVAL 30 SECOND TO DISK 'jbod2', d1 + INTERVAL 60 SECOND TO VOLUME 'external' SETTINGS storage_policy='jbods_with_external', merge_with_ttl_timeout=0 """.format(name=name, engine=engine)) node1.query(""" ALTER TABLE {name} MODIFY - TTL d1 + INTERVAL 0 SECOND TO DISK 'jbod2', + TTL d1 + INTERVAL 0 SECOND TO DISK 'jbod2', d1 + INTERVAL 5 SECOND TO VOLUME 'external', d1 + INTERVAL 10 SECOND DELETE """.format(name=name)) @@ -622,6 +626,96 @@ limitations under the License.""" node1.query("DROP TABLE IF EXISTS {name}".format(name=name)) +@pytest.mark.parametrize("name,engine", [ + ("concurrently_altering_ttl_mt","MergeTree()"), + ("concurrently_altering_ttl_replicated_mt","ReplicatedMergeTree('/clickhouse/concurrently_altering_ttl_replicated_mt', '1')",), +]) +def test_concurrent_alter_with_ttl_move(started_cluster, name, engine): + try: + node1.query(""" + CREATE TABLE {name} ( + EventDate Date, + number UInt64 + ) ENGINE = {engine} + ORDER BY tuple() + PARTITION BY toYYYYMM(EventDate) + SETTINGS storage_policy='jbods_with_external' + """.format(name=name, engine=engine)) + + values = list({ random.randint(1, 1000000) for _ in range(0, 1000) }) + + def insert(num): + for i in range(num): + day = random.randint(11, 30) + value = values.pop() + month = '0' + str(random.choice([3, 4])) + node1.query("INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(name, m=month, d=day, v=value)) + + def alter_move(num): + def produce_alter_move(node, name): + move_type = random.choice(["PART", "PARTITION"]) + if move_type == "PART": + for _ in range(10): + try: + parts = node1.query("SELECT name from system.parts where table = '{}' and active = 1".format(name)).strip().split('\n') + break + except QueryRuntimeException: + pass + else: + raise Exception("Cannot select from system.parts") + + move_part = random.choice(["'" + part + "'" for part in parts]) + else: + move_part = random.choice([201903, 201904]) + + move_disk = random.choice(["DISK", "VOLUME"]) + if move_disk == "DISK": + move_volume = random.choice(["'external'", "'jbod1'", "'jbod2'"]) + else: + move_volume = random.choice(["'main'", "'external'"]) + try: + node1.query("ALTER TABLE {} MOVE {mt} {mp} TO {md} {mv}".format( + name, mt=move_type, mp=move_part, md=move_disk, mv=move_volume)) + except QueryRuntimeException as ex: + pass + + for i in range(num): + produce_alter_move(node1, name) + + def alter_update(num): + for i in range(num): + node1.query("ALTER TABLE {} UPDATE number = number + 1 WHERE 1".format(name)) + + def alter_modify_ttl(num): + for i in range(num): + ttls = [] + for j in range(random.randint(1, 10)): + what = random.choice(["TO VOLUME 'main'", "TO VOLUME 'external'", "TO DISK 'jbod1'", "TO DISK 'jbod2'", "TO DISK 'external'"]) + when = "now()+{}".format(random.randint(-1, 5)) + ttls.append("{} {}".format(when, what)) + node1.query("ALTER TABLE {} MODIFY TTL {}".format(name, ", ".join(ttls))) + + def optimize_table(num): + for i in range(num): + node1.query("OPTIMIZE TABLE {} FINAL".format(name)) + + p = Pool(15) + tasks = [] + for i in range(5): + tasks.append(p.apply_async(insert, (100,))) + tasks.append(p.apply_async(alter_move, (100,))) + tasks.append(p.apply_async(alter_update, (100,))) + tasks.append(p.apply_async(alter_modify_ttl, (100,))) + tasks.append(p.apply_async(optimize_table, (100,))) + + for task in tasks: + task.get(timeout=60) + + assert node1.query("SELECT 1") == "1\n" + assert node1.query("SELECT COUNT() FROM {}".format(name)) == "500\n" + finally: + node1.query("DROP TABLE IF EXISTS {name}".format(name=name)) + @pytest.mark.parametrize("name,positive", [ ("test_double_move_while_select_negative", 0), ("test_double_move_while_select_positive", 1),