Some changes

This commit is contained in:
alesapin 2020-08-31 22:50:42 +03:00
parent b20a0bc254
commit 46f833b7df
5 changed files with 42 additions and 0 deletions

View File

@ -3055,6 +3055,36 @@ MergeTreeData::selectTTLEntryForTTLInfos(const IMergeTreeDataPart::TTLInfos & tt
return max_max_ttl ? *best_entry_it : std::optional<TTLDescription>();
}
CompressionCodecPtr MergeTreeData::getCompressionCodecForPart(size_t part_size_compressed, const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t current_time) const
{
time_t max_max_ttl = 0;
TTLDescriptions::const_iterator best_entry_it;
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto & recompression_ttl_entries = metadata_snapshot->getRecompressionTTLs();
for (auto ttl_entry_it = recompression_ttl_entries.begin(); ttl_entry_it != recompression_ttl_entries.end(); ++ttl_entry_it)
{
auto ttl_info_it = ttl_infos.recompression_ttl.find(ttl_entry_it->result_column);
/// Prefer TTL rule which went into action last.
if (ttl_info_it != ttl_infos.recompression_ttl.end()
&& ttl_info_it->second.max <= current_time
&& max_max_ttl <= ttl_info_it->second.max)
{
best_entry_it = ttl_entry_it;
max_max_ttl = ttl_info_it->second.max;
}
}
if (max_max_ttl)
return CompressionCodecFactory::instance().get(best_entry_it->recompression_codec, {});
return global_context.chooseCompressionCodec(
part_size_compressed,
static_cast<double>(part_size_compressed) / getTotalActiveSizeInBytes());
}
MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
{
DataParts res;

View File

@ -669,6 +669,9 @@ public:
std::optional<TTLDescription> selectTTLEntryForTTLInfos(const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const;
CompressionCodecPtr getCompressionCodecForPart(size_t part_size_compressed, const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t current_time) const;
/// Limiting parallel sends per one table, used in DataPartsExchange
std::atomic_uint current_table_sends {0};

View File

@ -284,6 +284,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
current_time,
data_settings->merge_with_ttl_timeout,
data_settings->ttl_only_drop_parts);
parts_to_merge = merge_selector.select(partitions, max_total_size_to_merge);
}

View File

@ -249,6 +249,7 @@ private:
/// Stores the next TTL merge due time for each partition (used only by TTLMergeSelector)
TTLMergeSelector::PartitionIdToTTLs next_ttl_merge_times_by_partition;
/// Performing TTL merges independently for each partition guarantees that
/// there is only a limited number of TTL merges and no partition stores data, that is too stale
};

View File

@ -16,6 +16,12 @@ void MergeTreeDataPartTTLInfos::update(const MergeTreeDataPartTTLInfos & other_i
updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
}
for (const auto & [name, ttl_info] : other_infos.recompression_ttl)
{
recompression_ttl[name].update(ttl_info);
updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
}
for (const auto & [expression, ttl_info] : other_infos.moves_ttl)
{
moves_ttl[expression].update(ttl_info);
@ -77,6 +83,7 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
ttl_info.max = move["max"].getUInt();
String expression = move["expression"].getString();
recompression_ttl.emplace(expression, ttl_info);
updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
}
}
}