Don't move anything if part already belongs their destination.

This commit is contained in:
Vladimir Chebotarev 2019-11-29 10:00:43 +03:00
parent f72da4ab0c
commit 618a39cc8c
5 changed files with 98 additions and 90 deletions

View File

@ -3153,73 +3153,95 @@ DiskSpace::ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size) cons
DiskSpace::ReservationPtr MergeTreeData::reserveSpacePreferringMoveDestination(UInt64 expected_size,
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t minimum_time) const
time_t time_of_move) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
auto reservation = tryReserveSpaceOnMoveDestination(expected_size, ttl_infos, minimum_time);
if (reservation)
return reservation;
DiskSpace::ReservationPtr reservation;
auto ttl_entry = selectMoveDestination(ttl_infos, time_of_move);
if (ttl_entry != nullptr)
{
DiskSpace::SpacePtr destination_ptr = ttl_entry->getDestination(storage_policy);
if (!destination_ptr)
{
if (ttl_entry->destination_type == PartDestinationType::VOLUME)
LOG_WARNING(log, "Would like to reserve space on volume '"
<< ttl_entry->destination_name << "' by TTL rule of table '"
<< log_name << "' but volume was not found");
else if (ttl_entry->destination_type == PartDestinationType::DISK)
LOG_WARNING(log, "Would like to reserve space on disk '"
<< ttl_entry->destination_name << "' by TTL rule of table '"
<< log_name << "' but disk was not found");
}
else
{
reservation = destination_ptr->reserve(expected_size);
if (reservation)
return reservation;
}
}
reservation = storage_policy->reserve(expected_size);
return returnReservationOrThrowError(expected_size, std::move(reservation));
}
DiskSpace::ReservationPtr MergeTreeData::tryReserveSpaceOnMoveDestination(UInt64 expected_size,
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t minimum_time) const
DiskSpace::ReservationPtr MergeTreeData::reserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
auto ttl_entry = selectMoveDestination(ttl_infos, minimum_time);
if (ttl_entry != nullptr)
{
DiskSpace::ReservationPtr reservation;
if (ttl_entry->destination_type == PartDestinationType::VOLUME)
{
auto volume_ptr = storage_policy->getVolumeByName(ttl_entry->destination_name);
if (volume_ptr)
{
reservation = volume_ptr->reserve(expected_size);
}
else
{
LOG_WARNING(log, "Would like to reserve space on volume '"
<< ttl_entry->destination_name << "' by TTL rule of table '"
<< log_name << "' but volume was not found");
}
}
else if (ttl_entry->destination_type == PartDestinationType::DISK)
{
auto disk_ptr = storage_policy->getDiskByName(ttl_entry->destination_name);
if (disk_ptr)
{
reservation = disk_ptr->reserve(expected_size);
}
else
{
LOG_WARNING(log, "Would like to reserve space on disk '"
<< ttl_entry->destination_name << "' by TTL rule of table '"
<< log_name << "' but disk was not found");
}
}
if (reservation)
return reservation;
}
return {};
}
DiskSpace::ReservationPtr MergeTreeData::reserveSpaceOnSpecificDisk(UInt64 expected_size, DiskSpace::DiskPtr disk) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
auto reservation = disk->reserve(expected_size);
auto reservation = space->reserve(expected_size);
return returnReservationOrThrowError(expected_size, std::move(reservation));
}
DiskSpace::SpacePtr MergeTreeData::TTLEntry::getDestination(const DiskSpace::StoragePolicyPtr & storage_policy) const
{
if (destination_type == PartDestinationType::VOLUME)
return storage_policy->getVolumeByName(destination_name);
else if (destination_type == PartDestinationType::DISK)
return storage_policy->getDiskByName(destination_name);
else
return {};
}
bool MergeTreeData::TTLEntry::isPartInDestination(const DiskSpace::StoragePolicyPtr & storage_policy, const MergeTreeDataPart & part) const
{
if (destination_type == PartDestinationType::VOLUME)
{
for (const auto & disk : storage_policy->getVolumeByName(destination_name)->disks)
if (disk->getName() == part.disk->getName())
return true;
}
else if (destination_type == PartDestinationType::DISK)
return storage_policy->getDiskByName(destination_name)->getName() == part.disk->getName();
return false;
}
const MergeTreeData::TTLEntry * MergeTreeData::selectMoveDestination(
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move) const
{
const MergeTreeData::TTLEntry * result = nullptr;
/// Prefer TTL rule which went into action last.
time_t max_max_ttl = 0;
for (const auto & ttl_entry : move_ttl_entries)
{
auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry.result_column);
if (ttl_info_it != ttl_infos.moves_ttl.end()
&& ttl_info_it->second.max <= time_of_move
&& max_max_ttl >= ttl_info_it->second.max)
{
result = &ttl_entry;
max_max_ttl = ttl_info_it->second.max;
}
}
return result;
}
MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
{
DataParts res;
@ -3399,7 +3421,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
String dst_part_name = src_part->getNewName(dst_part_info);
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
auto reservation = reserveSpaceOnSpecificDisk(src_part->bytes_on_disk, src_part->disk);
auto reservation = reserveSpaceInSpecificSpace(src_part->bytes_on_disk, src_part->disk);
String dst_part_path = getFullPathOnDisk(reservation->getDisk());
Poco::Path dst_part_absolute_path = Poco::Path(dst_part_path + tmp_dst_part_name).absolute();
Poco::Path src_part_absolute_path = Poco::Path(src_part->getFullPath()).absolute();
@ -3722,27 +3744,4 @@ bool MergeTreeData::moveParts(CurrentlyMovingPartsTagger && moving_tagger)
return true;
}
const MergeTreeData::TTLEntry * MergeTreeData::selectMoveDestination(
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t minimum_time) const
{
const MergeTreeData::TTLEntry * result = nullptr;
/// Prefer TTL rule which went into action last.
time_t max_max_ttl = 0;
for (const auto & ttl_entry : move_ttl_entries)
{
auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry.result_column);
if (ttl_info_it != ttl_infos.moves_ttl.end()
&& ttl_info_it->second.max <= minimum_time
&& max_max_ttl >= ttl_info_it->second.max)
{
result = &ttl_entry;
max_max_ttl = ttl_info_it->second.max;
}
}
return result;
}
}

View File

@ -678,11 +678,8 @@ public:
DiskSpace::ReservationPtr reserveSpace(UInt64 expected_size) const;
DiskSpace::ReservationPtr reserveSpacePreferringMoveDestination(UInt64 expected_size,
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t minimum_time) const;
DiskSpace::ReservationPtr tryReserveSpaceOnMoveDestination(UInt64 expected_size,
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t minimum_time) const;
DiskSpace::ReservationPtr reserveSpaceOnSpecificDisk(UInt64 expected_size, DiskSpace::DiskPtr disk) const;
time_t time_of_move) const;
DiskSpace::ReservationPtr reserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const;
/// Choose disk with max available free space
/// Reserves 0 bytes
@ -733,8 +730,13 @@ public:
String destination_name;
ASTPtr entry_ast;
DiskSpace::SpacePtr getDestination(const DiskSpace::StoragePolicyPtr & storage_policy) const;
bool isPartInDestination(const DiskSpace::StoragePolicyPtr & storage_policy, const MergeTreeDataPart & part) const;
};
const TTLEntry * selectMoveDestination(const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const;
using TTLEntriesByName = std::unordered_map<String, TTLEntry>;
TTLEntriesByName column_ttl_entries_by_name;
@ -978,9 +980,6 @@ private:
/// Check selected parts for movements. Used by ALTER ... MOVE queries.
CurrentlyMovingPartsTagger checkPartsForMove(const DataPartsVector & parts, DiskSpace::SpacePtr space);
const MergeTreeData::TTLEntry * selectMoveDestination(const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t minimum_time) const;
};
}

View File

@ -114,15 +114,25 @@ bool MergeTreePartsMover::selectPartsForMove(
}
}
time_t time_of_move = time(nullptr);
for (const auto & part : data_parts)
{
String reason;
/// Don't report message to log, because logging is excessive
/// Don't report message to log, because logging is excessive.
if (!can_move(part, &reason))
continue;
auto reservation = part->storage.tryReserveSpaceOnMoveDestination(part->bytes_on_disk, part->ttl_infos, time(nullptr));
const MergeTreeData::TTLEntry * ttl_entry_ptr = part->storage.selectMoveDestination(part->ttl_infos, time_of_move);
auto to_insert = need_to_move.find(part->disk);
DiskSpace::ReservationPtr reservation;
if (ttl_entry_ptr)
{
auto destination = ttl_entry_ptr->getDestination(policy);
if (destination && !ttl_entry_ptr->isPartInDestination(policy, *part))
reservation = part->storage.reserveSpaceInSpecificSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy));
}
if (reservation)
{
parts_to_move.emplace_back(part, std::move(reservation));
@ -149,9 +159,9 @@ bool MergeTreePartsMover::selectPartsForMove(
auto reservation = policy->reserve(part->bytes_on_disk, min_volume_index);
if (!reservation)
{
/// Next parts to move from this disk has greater size and same min volume index
/// There are no space for them
/// But it can be possible to move data from other disks
/// Next parts to move from this disk has greater size and same min volume index.
/// There are no space for them.
/// But it can be possible to move data from other disks.
break;
}
parts_to_move.emplace_back(part, std::move(reservation));

View File

@ -350,7 +350,7 @@ public:
/// if we mutate part, than we should reserve space on the same disk, because mutations possible can create hardlinks
if (is_mutation)
reserved_space = storage.reserveSpaceOnSpecificDisk(total_size, future_part_.parts[0]->disk);
reserved_space = storage.reserveSpaceInSpecificSpace(total_size, future_part_.parts[0]->disk);
else
{
MergeTreeDataPart::TTLInfos ttl_infos;

View File

@ -1147,7 +1147,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
/// Once we mutate part, we must reserve space on the same disk, because mutations can possibly create hardlinks.
/// Can throw an exception.
DiskSpace::ReservationPtr reserved_space = reserveSpaceOnSpecificDisk(estimated_space_for_result, source_part->disk);
DiskSpace::ReservationPtr reserved_space = reserveSpaceInSpecificSpace(estimated_space_for_result, source_part->disk);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);