Add option to disable ttl move on data part insert

This commit is contained in:
Pavel Kovalenko 2020-09-18 18:30:00 +03:00
parent 0052bbdd84
commit 0da19ab46d
7 changed files with 107 additions and 16 deletions

View File

@ -36,10 +36,11 @@ using Volumes = std::vector<VolumePtr>;
class IVolume : public Space
{
public:
IVolume(String name_, Disks disks_, size_t max_data_part_size_ = 0)
IVolume(String name_, Disks disks_, size_t max_data_part_size_ = 0, bool perform_ttl_move_on_insert_ = true)
: disks(std::move(disks_))
, name(name_)
, max_data_part_size(max_data_part_size_)
, perform_ttl_move_on_insert(perform_ttl_move_on_insert_)
{
}
@ -70,6 +71,9 @@ protected:
public:
/// Max size of reservation, zero means unlimited size
UInt64 max_data_part_size = 0;
/// Should a new data part be synchronously moved to a volume according to ttl on insert
/// or move this part in background task asynchronously after insert.
bool perform_ttl_move_on_insert;
};
/// Reservation for multiple disks at once. Can be used in RAID1 implementation.

View File

@ -53,6 +53,9 @@ VolumeJBOD::VolumeJBOD(
static constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u;
if (max_data_part_size != 0 && max_data_part_size < MIN_PART_SIZE)
LOG_WARNING(logger, "Volume {} max_data_part_size is too low ({} < {})", backQuote(name), ReadableSize(max_data_part_size), ReadableSize(MIN_PART_SIZE));
/// Default value is 'true' due to backward compatibility.
perform_ttl_move_on_insert = config.getBool(config_prefix + ".perform_ttl_move_on_insert", true);
}
DiskPtr VolumeJBOD::getDisk(size_t /* index */) const

View File

@ -2961,11 +2961,12 @@ ReservationPtr MergeTreeData::tryReserveSpace(UInt64 expected_size, SpacePtr spa
ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
size_t min_volume_index) const
size_t min_volume_index,
bool is_insert) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
ReservationPtr reservation = tryReserveSpacePreferringTTLRules(expected_size, ttl_infos, time_of_move, min_volume_index);
ReservationPtr reservation = tryReserveSpacePreferringTTLRules(expected_size, ttl_infos, time_of_move, min_volume_index, is_insert);
return checkAndReturnReservation(expected_size, std::move(reservation));
}
@ -2973,7 +2974,8 @@ ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(UInt64 expected_siz
ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
size_t min_volume_index) const
size_t min_volume_index,
bool is_insert) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
@ -2984,13 +2986,13 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_
if (ttl_entry)
{
SpacePtr destination_ptr = getDestinationForTTL(*ttl_entry);
SpacePtr destination_ptr = getDestinationForTTL(*ttl_entry, is_insert);
if (!destination_ptr)
{
if (ttl_entry->destination_type == DataDestinationType::VOLUME)
LOG_WARNING(log, "Would like to reserve space on volume '{}' by TTL rule of table '{}' but volume was not found", ttl_entry->destination_name, log_name);
LOG_WARNING(log, "Would like to reserve space on volume '{}' by TTL rule of table '{}' but volume was not found or rule is not applicable at the moment", ttl_entry->destination_name, log_name);
else if (ttl_entry->destination_type == DataDestinationType::DISK)
LOG_WARNING(log, "Would like to reserve space on disk '{}' by TTL rule of table '{}' but disk was not found", ttl_entry->destination_name, log_name);
LOG_WARNING(log, "Would like to reserve space on disk '{}' by TTL rule of table '{}' but disk was not found or rule is not applicable at the moment", ttl_entry->destination_name, log_name);
}
else
{
@ -3010,13 +3012,36 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_
return reservation;
}
SpacePtr MergeTreeData::getDestinationForTTL(const TTLDescription & ttl) const
SpacePtr MergeTreeData::getDestinationForTTL(const TTLDescription & ttl, bool is_insert) const
{
auto policy = getStoragePolicy();
if (ttl.destination_type == DataDestinationType::VOLUME)
return policy->getVolumeByName(ttl.destination_name);
{
auto volume = policy->getVolumeByName(ttl.destination_name);
if (!volume)
return {};
if (is_insert && !volume->perform_ttl_move_on_insert)
return {};
return volume;
}
else if (ttl.destination_type == DataDestinationType::DISK)
return policy->getDiskByName(ttl.destination_name);
{
auto disk = policy->getDiskByName(ttl.destination_name);
if (!disk)
return {};
auto volume = policy->getVolume(policy->getVolumeIndexByDisk(disk));
if (!volume)
return {};
if (is_insert && !volume->perform_ttl_move_on_insert)
return {};
return disk;
}
else
return {};
}

View File

@ -624,13 +624,15 @@ public:
UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
size_t min_volume_index = 0) const;
size_t min_volume_index = 0,
bool is_insert = false) const;
ReservationPtr tryReserveSpacePreferringTTLRules(
UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
size_t min_volume_index = 0) const;
size_t min_volume_index = 0,
bool is_insert = false) const;
/// Choose disk with max available free space
/// Reserves 0 bytes
@ -638,9 +640,9 @@ public:
/// Return alter conversions for part which must be applied on fly.
AlterConversions getAlterConversionsForPart(const MergeTreeDataPartPtr part) const;
/// Returns destination disk or volume for the TTL rule according to current
/// storage policy
SpacePtr getDestinationForTTL(const TTLDescription & ttl) const;
/// Returns destination disk or volume for the TTL rule according to current storage policy
/// 'is_insert' - is TTL move performed on new data part insert.
SpacePtr getDestinationForTTL(const TTLDescription & ttl, bool is_insert = false) const;
/// Checks if given part already belongs destination disk or volume for the
/// TTL rule.

View File

@ -237,7 +237,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr));
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr), true);
VolumePtr volume = data.getStoragePolicy()->getVolume(0);
auto new_data_part = data.createPart(

View File

@ -83,6 +83,18 @@
</main>
</volumes>
</only_jbod2>
<jbod_without_instant_ttl_move>
<volumes>
<main>
<disk>jbod1</disk>
</main>
<external>
<disk>external</disk>
<perform_ttl_move_on_insert>false</perform_ttl_move_on_insert>
</external>
</volumes>
</jbod_without_instant_ttl_move>
</policies>
</storage_configuration>

View File

@ -1102,3 +1102,48 @@ limitations under the License."""
finally:
node1.query("DROP TABLE IF EXISTS {name} NO DELAY".format(name=name))
@pytest.mark.parametrize("name,dest_type,engine", [
("mt_test_disabled_ttl_move_on_insert_work", "DISK", "MergeTree()"),
("mt_test_disabled_ttl_move_on_insert_work", "VOLUME", "MergeTree()"),
("replicated_mt_test_disabled_ttl_move_on_insert_work", "DISK", "ReplicatedMergeTree('/clickhouse/replicated_test_disabled_ttl_move_on_insert_work', '1')"),
("replicated_mt_test_disabled_ttl_move_on_insert_work", "VOLUME", "ReplicatedMergeTree('/clickhouse/replicated_test_disabled_ttl_move_on_insert_work', '1')"),
])
def test_disabled_ttl_move_on_insert(started_cluster, name, dest_type, engine):
try:
node1.query("SYSTEM STOP MOVES")
node1.query("""
CREATE TABLE {name} (
s1 String,
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
TTL d1 TO {dest_type} 'external'
SETTINGS storage_policy='jbod_without_instant_ttl_move'
""".format(name=name, dest_type=dest_type, engine=engine))
data = [] # 10MB in total
for i in range(10):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(
time.time() - 1))) # 1MB row
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == "jbod1"
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "10"
node1.query("SYSTEM START MOVES")
time.sleep(3)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == "external"
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "10"
finally:
try:
node1.query("DROP TABLE IF EXISTS {} NO DELAY".format(name))
except:
pass