diff --git a/src/Disks/IVolume.h b/src/Disks/IVolume.h index 5e7f09e1d04..0f38fe4d22e 100644 --- a/src/Disks/IVolume.h +++ b/src/Disks/IVolume.h @@ -36,10 +36,11 @@ using Volumes = std::vector; class IVolume : public Space { public: - IVolume(String name_, Disks disks_, size_t max_data_part_size_ = 0) + IVolume(String name_, Disks disks_, size_t max_data_part_size_ = 0, bool perform_ttl_move_on_insert_ = true) : disks(std::move(disks_)) , name(name_) , max_data_part_size(max_data_part_size_) + , perform_ttl_move_on_insert(perform_ttl_move_on_insert_) { } @@ -70,6 +71,9 @@ protected: public: /// Max size of reservation, zero means unlimited size UInt64 max_data_part_size = 0; + /// Should a new data part be synchronously moved to a volume according to ttl on insert + /// or move this part in background task asynchronously after insert. + bool perform_ttl_move_on_insert; }; /// Reservation for multiple disks at once. Can be used in RAID1 implementation. diff --git a/src/Disks/VolumeJBOD.cpp b/src/Disks/VolumeJBOD.cpp index bf9dcf7f5d8..3ac8a50acfb 100644 --- a/src/Disks/VolumeJBOD.cpp +++ b/src/Disks/VolumeJBOD.cpp @@ -53,6 +53,9 @@ VolumeJBOD::VolumeJBOD( static constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u; if (max_data_part_size != 0 && max_data_part_size < MIN_PART_SIZE) LOG_WARNING(logger, "Volume {} max_data_part_size is too low ({} < {})", backQuote(name), ReadableSize(max_data_part_size), ReadableSize(MIN_PART_SIZE)); + + /// Default value is 'true' due to backward compatibility. + perform_ttl_move_on_insert = config.getBool(config_prefix + ".perform_ttl_move_on_insert", true); } DiskPtr VolumeJBOD::getDisk(size_t /* index */) const diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 5969ecc5baf..853ea8eabcf 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -2961,11 +2961,12 @@ ReservationPtr MergeTreeData::tryReserveSpace(UInt64 expected_size, SpacePtr spa ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(UInt64 expected_size, const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move, - size_t min_volume_index) const + size_t min_volume_index, + bool is_insert) const { expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size); - ReservationPtr reservation = tryReserveSpacePreferringTTLRules(expected_size, ttl_infos, time_of_move, min_volume_index); + ReservationPtr reservation = tryReserveSpacePreferringTTLRules(expected_size, ttl_infos, time_of_move, min_volume_index, is_insert); return checkAndReturnReservation(expected_size, std::move(reservation)); } @@ -2973,7 +2974,8 @@ ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(UInt64 expected_siz ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_size, const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move, - size_t min_volume_index) const + size_t min_volume_index, + bool is_insert) const { expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size); @@ -2984,13 +2986,13 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_ if (ttl_entry) { - SpacePtr destination_ptr = getDestinationForTTL(*ttl_entry); + SpacePtr destination_ptr = getDestinationForTTL(*ttl_entry, is_insert); if (!destination_ptr) { if (ttl_entry->destination_type == DataDestinationType::VOLUME) - LOG_WARNING(log, "Would like to reserve space on volume '{}' by TTL rule of table '{}' but volume was not found", ttl_entry->destination_name, log_name); + LOG_WARNING(log, "Would like to reserve space on volume '{}' by TTL rule of table '{}' but volume was not found or rule is not applicable at the moment", ttl_entry->destination_name, log_name); else if (ttl_entry->destination_type == DataDestinationType::DISK) - LOG_WARNING(log, "Would like to reserve space on disk '{}' by TTL rule of table '{}' but disk was not found", ttl_entry->destination_name, log_name); + LOG_WARNING(log, "Would like to reserve space on disk '{}' by TTL rule of table '{}' but disk was not found or rule is not applicable at the moment", ttl_entry->destination_name, log_name); } else { @@ -3010,13 +3012,36 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_ return reservation; } -SpacePtr MergeTreeData::getDestinationForTTL(const TTLDescription & ttl) const +SpacePtr MergeTreeData::getDestinationForTTL(const TTLDescription & ttl, bool is_insert) const { auto policy = getStoragePolicy(); if (ttl.destination_type == DataDestinationType::VOLUME) - return policy->getVolumeByName(ttl.destination_name); + { + auto volume = policy->getVolumeByName(ttl.destination_name); + + if (!volume) + return {}; + + if (is_insert && !volume->perform_ttl_move_on_insert) + return {}; + + return volume; + } else if (ttl.destination_type == DataDestinationType::DISK) - return policy->getDiskByName(ttl.destination_name); + { + auto disk = policy->getDiskByName(ttl.destination_name); + if (!disk) + return {}; + + auto volume = policy->getVolume(policy->getVolumeIndexByDisk(disk)); + if (!volume) + return {}; + + if (is_insert && !volume->perform_ttl_move_on_insert) + return {}; + + return disk; + } else return {}; } diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 82f118a4c0f..59628371ac8 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -624,13 +624,15 @@ public: UInt64 expected_size, const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move, - size_t min_volume_index = 0) const; + size_t min_volume_index = 0, + bool is_insert = false) const; ReservationPtr tryReserveSpacePreferringTTLRules( UInt64 expected_size, const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move, - size_t min_volume_index = 0) const; + size_t min_volume_index = 0, + bool is_insert = false) const; /// Choose disk with max available free space /// Reserves 0 bytes @@ -638,9 +640,9 @@ public: /// Return alter conversions for part which must be applied on fly. AlterConversions getAlterConversionsForPart(const MergeTreeDataPartPtr part) const; - /// Returns destination disk or volume for the TTL rule according to current - /// storage policy - SpacePtr getDestinationForTTL(const TTLDescription & ttl) const; + /// Returns destination disk or volume for the TTL rule according to current storage policy + /// 'is_insert' - is TTL move performed on new data part insert. + SpacePtr getDestinationForTTL(const TTLDescription & ttl, bool is_insert = false) const; /// Checks if given part already belongs destination disk or volume for the /// TTL rule. diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index e5b684a1361..b8de87ecd3a 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -237,7 +237,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false); NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames()); - ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr)); + ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr), true); VolumePtr volume = data.getStoragePolicy()->getVolume(0); auto new_data_part = data.createPart( diff --git a/tests/integration/test_ttl_move/configs/config.d/storage_configuration.xml b/tests/integration/test_ttl_move/configs/config.d/storage_configuration.xml index 47bf9f56cdd..e96bde89ca9 100644 --- a/tests/integration/test_ttl_move/configs/config.d/storage_configuration.xml +++ b/tests/integration/test_ttl_move/configs/config.d/storage_configuration.xml @@ -83,6 +83,18 @@ + + + +
+ jbod1 +
+ + external + false + +
+
diff --git a/tests/integration/test_ttl_move/test.py b/tests/integration/test_ttl_move/test.py index ad822bc6545..cd822025609 100644 --- a/tests/integration/test_ttl_move/test.py +++ b/tests/integration/test_ttl_move/test.py @@ -1102,3 +1102,48 @@ limitations under the License.""" finally: node1.query("DROP TABLE IF EXISTS {name} NO DELAY".format(name=name)) + + +@pytest.mark.parametrize("name,dest_type,engine", [ + ("mt_test_disabled_ttl_move_on_insert_work", "DISK", "MergeTree()"), + ("mt_test_disabled_ttl_move_on_insert_work", "VOLUME", "MergeTree()"), + ("replicated_mt_test_disabled_ttl_move_on_insert_work", "DISK", "ReplicatedMergeTree('/clickhouse/replicated_test_disabled_ttl_move_on_insert_work', '1')"), + ("replicated_mt_test_disabled_ttl_move_on_insert_work", "VOLUME", "ReplicatedMergeTree('/clickhouse/replicated_test_disabled_ttl_move_on_insert_work', '1')"), +]) +def test_disabled_ttl_move_on_insert(started_cluster, name, dest_type, engine): + try: + node1.query("SYSTEM STOP MOVES") + + node1.query(""" + CREATE TABLE {name} ( + s1 String, + d1 DateTime + ) ENGINE = {engine} + ORDER BY tuple() + TTL d1 TO {dest_type} 'external' + SETTINGS storage_policy='jbod_without_instant_ttl_move' + """.format(name=name, dest_type=dest_type, engine=engine)) + + data = [] # 10MB in total + for i in range(10): + data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format( + time.time() - 1))) # 1MB row + + node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data]))) + + used_disks = get_used_disks_for_table(node1, name) + assert set(used_disks) == "jbod1" + assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "10" + + node1.query("SYSTEM START MOVES") + time.sleep(3) + + used_disks = get_used_disks_for_table(node1, name) + assert set(used_disks) == "external" + assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "10" + + finally: + try: + node1.query("DROP TABLE IF EXISTS {} NO DELAY".format(name)) + except: + pass \ No newline at end of file