Fixed ttl move logic in background move task and added ttl enforcement on inserts.

This commit is contained in:
Vladimir Chebotarev 2019-10-29 02:01:07 +03:00
parent 4cd75f926b
commit 76c4ac9f60
2 changed files with 55 additions and 10 deletions

View File

@ -211,10 +211,56 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
else
part_name = new_part_info.getPartName();
/// Size of part would not be grater than block.bytes() + epsilon
size_t expected_size = block.bytes();
auto reservation = data.reserveSpace(expected_size);
auto current_time = time(nullptr);
const MergeTreeData::MoveTTLEntry * best_ttl_entry = nullptr;
time_t max_min_ttl = 0;
DB::MergeTreeDataPart::TTLInfos move_ttl_infos;
for (const auto & [expression, ttl_entry] : data.move_ttl_entries_by_name)
{
auto & ttl_info = move_ttl_infos.moves_ttl[expression];
updateTTL(ttl_entry, move_ttl_infos, ttl_info, block);
if (ttl_info.min > current_time && max_min_ttl < ttl_info.min)
{
best_ttl_entry = &ttl_entry;
max_min_ttl = ttl_info.min;
}
}
DiskSpace::ReservationPtr reservation;
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
if (best_ttl_entry != nullptr)
{
if (best_ttl_entry->destination_type == ASTTTLElement::DestinationType::VOLUME)
{
auto volume_ptr = data.getStoragePolicy()->getVolumeByName(best_ttl_entry->destination_name);
if (volume_ptr)
{
reservation = volume_ptr->reserve(expected_size);
}
else
{
/// FIXME: log warning
}
}
else if (best_ttl_entry->destination_type == ASTTTLElement::DestinationType::DISK)
{
auto disk_ptr = data.getStoragePolicy()->getDiskByName(best_ttl_entry->destination_name);
if (disk_ptr)
{
reservation = disk_ptr->reserve(expected_size);
}
else
{
/// FIXME: log warning
}
}
}
if (!reservation)
{
reservation = data.reserveSpace(expected_size);
}
MergeTreeData::MutableDataPartPtr new_data_part =
std::make_shared<MergeTreeData::DataPart>(data, reservation->getDisk(), part_name, new_part_info);
@ -270,8 +316,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
for (const auto & [name, ttl_entry] : data.ttl_entries_by_name)
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block);
for (const auto & [expression, ttl_entry] : data.move_ttl_entries_by_name)
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.moves_ttl[expression], block);
new_data_part->ttl_infos.update(move_ttl_infos);
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.

View File

@ -113,16 +113,16 @@ bool MergeTreePartsMover::selectPartsForMove(
const auto ttl_entries_end = part->storage.move_ttl_entries_by_name.end();
auto best_ttl_entry_it = ttl_entries_end;
time_t max_max_ttl = 0;
time_t max_min_ttl = 0;
for (auto & [name, ttl_info] : part->ttl_infos.moves_ttl)
{
auto move_ttl_entry_it = part->storage.move_ttl_entries_by_name.find(name);
if (move_ttl_entry_it != part->storage.move_ttl_entries_by_name.end())
{
if (ttl_info.max < current_time && max_max_ttl < ttl_info.max)
if (ttl_info.min > current_time && max_min_ttl < ttl_info.min)
{
best_ttl_entry_it = move_ttl_entry_it;
max_max_ttl = ttl_info.max;
max_min_ttl = ttl_info.min;
}
}
}
@ -143,7 +143,7 @@ bool MergeTreePartsMover::selectPartsForMove(
}
else
{
/// FIXME: log error
/// FIXME: log warning?
}
}
else if (move_ttl_entry.destination_type == ASTTTLElement::DestinationType::DISK)
@ -160,7 +160,7 @@ bool MergeTreePartsMover::selectPartsForMove(
}
else
{
/// FIXME: log error
/// FIXME: log warning?
}
}
}