More consistent metadata usage

This commit is contained in:
alesapin 2020-10-05 19:41:46 +03:00
parent a49591b250
commit 8ec58c17f3
7 changed files with 36 additions and 23 deletions

View File

@ -276,7 +276,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
ReadBufferFromString ttl_infos_buffer(ttl_infos_string); ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
assertString("ttl format version: 1\n", ttl_infos_buffer); assertString("ttl format version: 1\n", ttl_infos_buffer);
ttl_infos.read(ttl_infos_buffer); ttl_infos.read(ttl_infos_buffer);
reservation = data.reserveSpacePreferringTTLRules(sum_files_size, ttl_infos, std::time(nullptr), 0, true); reservation = data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
} }
else else
reservation = data.reserveSpace(sum_files_size); reservation = data.reserveSpace(sum_files_size);

View File

@ -3037,28 +3037,31 @@ ReservationPtr MergeTreeData::tryReserveSpace(UInt64 expected_size, SpacePtr spa
return space->reserve(expected_size); return space->reserve(expected_size);
} }
ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(UInt64 expected_size, ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(
const IMergeTreeDataPart::TTLInfos & ttl_infos, const StorageMetadataPtr & metadata_snapshot,
time_t time_of_move, UInt64 expected_size,
size_t min_volume_index, const IMergeTreeDataPart::TTLInfos & ttl_infos,
bool is_insert) const time_t time_of_move,
size_t min_volume_index,
bool is_insert) const
{ {
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size); expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
ReservationPtr reservation = tryReserveSpacePreferringTTLRules(expected_size, ttl_infos, time_of_move, min_volume_index, is_insert); ReservationPtr reservation = tryReserveSpacePreferringTTLRules(metadata_snapshot, expected_size, ttl_infos, time_of_move, min_volume_index, is_insert);
return checkAndReturnReservation(expected_size, std::move(reservation)); return checkAndReturnReservation(expected_size, std::move(reservation));
} }
ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_size, ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
const IMergeTreeDataPart::TTLInfos & ttl_infos, const StorageMetadataPtr & metadata_snapshot,
time_t time_of_move, UInt64 expected_size,
size_t min_volume_index, const IMergeTreeDataPart::TTLInfos & ttl_infos,
bool is_insert) const time_t time_of_move,
size_t min_volume_index,
bool is_insert) const
{ {
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size); expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
auto metadata_snapshot = getInMemoryMetadataPtr();
ReservationPtr reservation; ReservationPtr reservation;
auto move_ttl_entry = selectTTLDescriptionForTTLInfos(metadata_snapshot->getMoveTTLs(), ttl_infos.moves_ttl, time_of_move, true); auto move_ttl_entry = selectTTLDescriptionForTTLInfos(metadata_snapshot->getMoveTTLs(), ttl_infos.moves_ttl, time_of_move, true);

View File

@ -632,6 +632,7 @@ public:
/// Reserves space at least 1MB preferring best destination according to `ttl_infos`. /// Reserves space at least 1MB preferring best destination according to `ttl_infos`.
ReservationPtr reserveSpacePreferringTTLRules( ReservationPtr reserveSpacePreferringTTLRules(
const StorageMetadataPtr & metadata_snapshot,
UInt64 expected_size, UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos, const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move, time_t time_of_move,
@ -639,6 +640,7 @@ public:
bool is_insert = false) const; bool is_insert = false) const;
ReservationPtr tryReserveSpacePreferringTTLRules( ReservationPtr tryReserveSpacePreferringTTLRules(
const StorageMetadataPtr & metadata_snapshot,
UInt64 expected_size, UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos, const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move, time_t time_of_move,

View File

@ -182,6 +182,10 @@ std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescripti
for (auto ttl_entry_it = descriptions.begin(); ttl_entry_it != descriptions.end(); ++ttl_entry_it) for (auto ttl_entry_it = descriptions.begin(); ttl_entry_it != descriptions.end(); ++ttl_entry_it)
{ {
auto ttl_info_it = ttl_info_map.find(ttl_entry_it->result_column); auto ttl_info_it = ttl_info_map.find(ttl_entry_it->result_column);
if (ttl_info_it == ttl_info_map.end())
continue;
time_t ttl_time; time_t ttl_time;
if (use_max) if (use_max)
@ -190,8 +194,7 @@ std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescripti
ttl_time = ttl_info_it->second.min; ttl_time = ttl_info_it->second.min;
/// Prefer TTL rule which went into action last. /// Prefer TTL rule which went into action last.
if (ttl_info_it != ttl_info_map.end() if (ttl_time <= current_time
&& ttl_time <= current_time
&& best_ttl_time <= ttl_time) && best_ttl_time <= ttl_time)
{ {
best_entry_it = ttl_entry_it; best_entry_it = ttl_entry_it;

View File

@ -237,7 +237,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false); updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames()); NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr), 0, true); ReservationPtr reservation = data.reserveSpacePreferringTTLRules(metadata_snapshot, expected_size, move_ttl_infos, time(nullptr), 0, true);
VolumePtr volume = data.getStoragePolicy()->getVolume(0); VolumePtr volume = data.getStoragePolicy()->getVolume(0);
auto new_data_part = data.createPart( auto new_data_part = data.createPart(

View File

@ -286,7 +286,12 @@ struct CurrentlyMergingPartsTagger
StorageMergeTree & storage; StorageMergeTree & storage;
public: public:
CurrentlyMergingPartsTagger(FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation) CurrentlyMergingPartsTagger(
FutureMergedMutatedPart & future_part_,
size_t total_size,
StorageMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot,
bool is_mutation)
: future_part(future_part_), storage(storage_) : future_part(future_part_), storage(storage_)
{ {
/// Assume mutex is already locked, because this method is called from mergeTask. /// Assume mutex is already locked, because this method is called from mergeTask.
@ -304,7 +309,7 @@ public:
max_volume_index = std::max(max_volume_index, storage.getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk())); max_volume_index = std::max(max_volume_index, storage.getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk()));
} }
reserved_space = storage.tryReserveSpacePreferringTTLRules(total_size, ttl_infos, time(nullptr), max_volume_index); reserved_space = storage.tryReserveSpacePreferringTTLRules(metadata_snapshot, total_size, ttl_infos, time(nullptr), max_volume_index);
} }
if (!reserved_space) if (!reserved_space)
{ {
@ -715,7 +720,7 @@ bool StorageMergeTree::merge(
return false; return false;
} }
merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, false); merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false);
auto table_id = getStorageID(); auto table_id = getStorageID();
merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part); merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
} }
@ -856,7 +861,7 @@ bool StorageMergeTree::tryMutatePart()
future_part.name = part->getNewName(new_part_info); future_part.name = part->getNewName(new_part_info);
future_part.type = part->getType(); future_part.type = part->getType();
tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true); tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
break; break;
} }
} }

View File

@ -1416,11 +1416,11 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
ttl_infos.update(part_ptr->ttl_infos); ttl_infos.update(part_ptr->ttl_infos);
max_volume_index = std::max(max_volume_index, getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk())); max_volume_index = std::max(max_volume_index, getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk()));
} }
ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge,
ttl_infos, time(nullptr), max_volume_index);
auto table_lock = lockForShare(RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations); auto table_lock = lockForShare(RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);
StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr(); StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr();
ReservationPtr reserved_space = reserveSpacePreferringTTLRules(
metadata_snapshot, estimated_space_for_merge, ttl_infos, time(nullptr), max_volume_index);
FutureMergedMutatedPart future_merged_part(parts, entry.new_part_type); FutureMergedMutatedPart future_merged_part(parts, entry.new_part_type);
if (future_merged_part.name != entry.new_part_name) if (future_merged_part.name != entry.new_part_name)