ClickHouse/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp

212 lines
6.5 KiB
C++
Raw Normal View History

2019-08-19 14:40:12 +00:00
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <set>
#include <boost/algorithm/string/join.hpp>
namespace DB
{
2019-09-02 11:35:53 +00:00
namespace ErrorCodes
{
extern const int ABORTED;
}
2019-08-19 14:40:12 +00:00
namespace
{
2019-09-02 11:35:53 +00:00
2019-08-20 17:16:32 +00:00
/// Contains minimal number of heaviest parts, which sum size on disk is greater than required.
/// If there are not enough summary size, than contains all.
2019-08-19 14:40:12 +00:00
class LargestPartsWithRequiredSize
{
struct PartsSizeOnDiskComparator
{
2019-09-02 11:35:53 +00:00
bool operator()(const MergeTreeData::DataPartPtr & f, const MergeTreeData::DataPartPtr & s) const
2019-08-19 14:40:12 +00:00
{
return f->bytes_on_disk < s->bytes_on_disk;
}
};
std::set<MergeTreeData::DataPartPtr, PartsSizeOnDiskComparator> elems;
UInt64 required_size_sum;
UInt64 current_size_sum = 0;
public:
2019-09-02 11:35:53 +00:00
LargestPartsWithRequiredSize(UInt64 required_sum_size_) : required_size_sum(required_sum_size_) {}
2019-08-19 14:40:12 +00:00
void add(MergeTreeData::DataPartPtr part)
{
if (current_size_sum < required_size_sum)
{
elems.emplace(part);
current_size_sum += part->bytes_on_disk;
return;
}
/// Adding smaller element
if (!elems.empty() && (*elems.begin())->bytes_on_disk >= part->bytes_on_disk)
return;
elems.emplace(part);
current_size_sum += part->bytes_on_disk;
while (!elems.empty() && (current_size_sum - (*elems.begin())->bytes_on_disk >= required_size_sum))
{
current_size_sum -= (*elems.begin())->bytes_on_disk;
elems.erase(elems.begin());
}
}
/// Returns elems ordered by size
MergeTreeData::DataPartsVector getAccumulatedParts()
{
MergeTreeData::DataPartsVector res;
for (const auto & elem : elems)
res.push_back(elem);
return res;
}
};
}
bool MergeTreePartsMover::selectPartsToMove(
MergeTreeMovingParts & parts_to_move,
const AllowedMovingPredicate & can_move)
{
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
if (data_parts.empty())
return false;
std::unordered_map<DiskSpace::DiskPtr, LargestPartsWithRequiredSize> need_to_move;
const auto & policy = data.getStoragePolicy();
const auto & volumes = policy->getVolumes();
/// Do not check if policy has one volume
if (volumes.size() == 1)
{
return false;
}
/// Do not check last volume
for (size_t i = 0; i != volumes.size() - 1; ++i)
{
for (const auto & disk : volumes[i]->disks)
{
auto space_information = disk->getSpaceInformation();
UInt64 total_space_with_factor = space_information.getTotalSpace() * policy->getMoveFactor();
/// Do not take into account reserved space
if (total_space_with_factor > space_information.getAvailableSpace())
need_to_move.emplace(disk, total_space_with_factor - space_information.getAvailableSpace());
}
}
for (const auto & part : data_parts)
{
2019-09-02 11:35:53 +00:00
String reason;
if (!can_move(part, &reason))
{
LOG_TRACE(log, "Cannot select part '" << part->name << "' to move, becase " << reason);
2019-08-19 14:40:12 +00:00
continue;
2019-09-02 11:35:53 +00:00
}
2019-08-19 14:40:12 +00:00
auto to_insert = need_to_move.find(part->disk);
if (to_insert != need_to_move.end())
to_insert->second.add(part);
}
for (auto && move : need_to_move)
{
auto min_volume_priority = policy->getVolumePriorityByDisk(move.first) + 1;
for (auto && part : move.second.getAccumulatedParts())
{
auto reservation = policy->reserve(part->bytes_on_disk, min_volume_priority);
if (!reservation)
{
/// Next parts to move from this disk has greater size and same min volume priority
/// There are no space for them
/// But it can be possible to move data from other disks
break;
}
parts_to_move.emplace_back(part, std::move(reservation));
}
}
return !parts_to_move.empty();
}
MergeTreeData::DataPartsVector MergeTreePartsMover::cloneParts(const MergeTreeMovingParts & parts)
{
MergeTreeData::DataPartsVector res;
for (auto && move : parts)
{
2019-09-02 11:35:53 +00:00
if (moves_blocker.isCancelled())
{
/// Removing all copied parts from disk
for (auto & part : res)
part->remove();
throw Exception("Cancelled moving parts.", ErrorCodes::ABORTED);
}
2019-08-19 14:40:12 +00:00
LOG_TRACE(log, "Cloning part " << move.part->name);
move.part->makeCloneOnDiskDetached(move.reserved_space);
MergeTreeData::MutableDataPartPtr cloned_part =
std::make_shared<MergeTreeData::DataPart>(data, move.reserved_space->getDisk(), move.part->name);
cloned_part->relative_path = "detached/" + move.part->name;
LOG_TRACE(log, "Part " << move.part->name << " was cloned to " << cloned_part->getFullPath());
cloned_part->loadColumnsChecksumsIndexes(true, true);
res.push_back(cloned_part);
}
return res;
}
bool MergeTreePartsMover::swapClonedParts(const MergeTreeData::DataPartsVector & cloned_parts, String * out_reason)
{
std::vector<String> failed_parts;
2019-09-02 11:35:53 +00:00
for (size_t i = 0; i < cloned_parts.size(); ++i)
2019-08-19 14:40:12 +00:00
{
2019-09-02 11:35:53 +00:00
if (moves_blocker.isCancelled())
{
/// Removing all copied parts from disk
for (size_t j = i; j < cloned_parts.size(); ++j)
cloned_parts[j]->remove();
throw Exception("Cancelled moving parts.", ErrorCodes::ABORTED);
}
auto part = data.getActiveContainingPart(cloned_parts[i]->name);
if (!part || part->name != cloned_parts[i]->name)
2019-08-19 14:40:12 +00:00
{
2019-09-02 11:35:53 +00:00
LOG_INFO(log, "Failed to swap " << cloned_parts[i]->name << ". Active part doesn't exist."
2019-08-21 10:09:29 +00:00
<< " It can be removed by merge or deleted by hand. Will remove copy on path '"
2019-09-02 11:35:53 +00:00
<< cloned_parts[i]->getFullPath() << "'.");
failed_parts.push_back(cloned_parts[i]->name);
cloned_parts[i]->remove();
2019-08-19 14:40:12 +00:00
continue;
}
2019-09-02 11:35:53 +00:00
cloned_parts[i]->renameTo(part->name);
2019-08-19 14:40:12 +00:00
2019-09-02 11:35:53 +00:00
data.swapActivePart(cloned_parts[i]);
2019-08-19 14:40:12 +00:00
}
if (!failed_parts.empty())
{
std::ostringstream oss;
oss << "Failed to swap parts: ";
oss << boost::algorithm::join(failed_parts, ", ");
oss << ". Their active parts doesn't exist.";
2019-08-19 14:40:12 +00:00
*out_reason = oss.str();
return false;
}
2019-08-21 10:09:29 +00:00
2019-08-21 13:15:44 +00:00
return true;
2019-08-19 14:40:12 +00:00
}
}