Merge pull request #6191 from svladykin/ttldropparts

TTL: Only drop altogether the expired parts and not partially prune them.
This commit is contained in:
alexey-milovidov 2019-08-27 16:54:56 +03:00 committed by GitHub
commit d12ced8099
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 45 additions and 30 deletions

View File

@ -69,22 +69,19 @@ bool TTLBlockInputStream::isTTLExpired(time_t ttl)
Block TTLBlockInputStream::readImpl()
{
/// Skip all data if table ttl is expired for part
if (storage.hasTableTTL() && isTTLExpired(old_ttl_infos.table_ttl.max))
{
rows_removed = data_part->rows_count;
return {};
}
Block block = children.at(0)->read();
if (!block)
return block;
if (storage.hasTableTTL())
{
/// Skip all data if table ttl is expired for part
if (isTTLExpired(old_ttl_infos.table_ttl.max))
{
rows_removed = data_part->rows_count;
return {};
}
if (force || isTTLExpired(old_ttl_infos.table_ttl.min))
removeRowsWithExpiredTableTTL(block);
}
if (storage.hasTableTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min)))
removeRowsWithExpiredTableTTL(block);
removeValuesWithExpiredColumnTTL(block);
@ -94,9 +91,9 @@ Block TTLBlockInputStream::readImpl()
void TTLBlockInputStream::readSuffixImpl()
{
for (const auto & elem : new_ttl_infos.columns_ttl)
new_ttl_infos.updatePartMinTTL(elem.second.min);
new_ttl_infos.updatePartMinMaxTTL(elem.second.min, elem.second.max);
new_ttl_infos.updatePartMinTTL(new_ttl_infos.table_ttl.min);
new_ttl_infos.updatePartMinMaxTTL(new_ttl_infos.table_ttl.min, new_ttl_infos.table_ttl.max);
data_part->ttl_infos = std::move(new_ttl_infos);
data_part->empty_columns = std::move(empty_columns);

View File

@ -40,8 +40,11 @@ public:
/// Opaque pointer to avoid dependencies (it is not possible to do forward declaration of typedef).
const void * data;
/// Minimal time, when we need to delete some data from this part
/// Minimal time, when we need to delete some data from this part.
time_t min_ttl;
/// Maximum time, when we will need to drop this part altogether because all rows in it are expired.
time_t max_ttl;
};
/// Parts are belong to partitions. Only parts within same partition could be merged.

View File

@ -211,8 +211,11 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
part_info.level = part->info.level;
part_info.data = ∂
part_info.min_ttl = part->ttl_infos.part_min_ttl;
part_info.max_ttl = part->ttl_infos.part_max_ttl;
if (part_info.min_ttl && part_info.min_ttl <= current_time)
time_t ttl = data_settings->ttl_only_drop_parts ? part_info.max_ttl : part_info.min_ttl;
if (ttl && ttl <= current_time)
has_part_with_expired_ttl = true;
partitions.back().emplace_back(part_info);
@ -239,7 +242,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
/// NOTE Could allow selection of different merge strategy.
if (can_merge_with_ttl && has_part_with_expired_ttl && !ttl_merges_blocker.isCancelled())
{
merge_selector = std::make_unique<TTLMergeSelector>(current_time);
merge_selector = std::make_unique<TTLMergeSelector>(current_time, data_settings->ttl_only_drop_parts);
last_merge_with_ttl = current_time;
}
else

View File

@ -12,11 +12,11 @@ void MergeTreeDataPartTTLInfos::update(const MergeTreeDataPartTTLInfos & other_i
for (const auto & [name, ttl_info] : other_infos.columns_ttl)
{
columns_ttl[name].update(ttl_info);
updatePartMinTTL(ttl_info.min);
updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
}
table_ttl.update(other_infos.table_ttl);
updatePartMinTTL(table_ttl.min);
updatePartMinMaxTTL(table_ttl.min, table_ttl.max);
}
void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
@ -37,7 +37,7 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
String name = col["name"].getString();
columns_ttl.emplace(name, ttl_info);
updatePartMinTTL(ttl_info.min);
updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
}
}
if (json.has("table"))
@ -46,7 +46,7 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
table_ttl.min = table["min"].getUInt();
table_ttl.max = table["max"].getUInt();
updatePartMinTTL(table_ttl.min);
updatePartMinMaxTTL(table_ttl.min, table_ttl.max);
}
}

View File

@ -36,15 +36,19 @@ struct MergeTreeDataPartTTLInfos
std::unordered_map<String, MergeTreeDataPartTTLInfo> columns_ttl;
MergeTreeDataPartTTLInfo table_ttl;
time_t part_min_ttl = 0;
time_t part_max_ttl = 0;
void read(ReadBuffer & in);
void write(WriteBuffer & out) const;
void update(const MergeTreeDataPartTTLInfos & other_infos);
void updatePartMinTTL(time_t time)
void updatePartMinMaxTTL(time_t time_min, time_t time_max)
{
if (time && (!part_min_ttl || time < part_min_ttl))
part_min_ttl = time;
if (time_min && (!part_min_ttl || time_min < part_min_ttl))
part_min_ttl = time_min;
if (time_max && (!part_max_ttl || time_max > part_max_ttl))
part_max_ttl = time_max;
}
};

View File

@ -99,7 +99,7 @@ void updateTTL(const MergeTreeData::TTLEntry & ttl_entry, MergeTreeDataPart::TTL
else
throw Exception("Unexpected type of result ttl column", ErrorCodes::LOGICAL_ERROR);
ttl_infos.updatePartMinTTL(ttl_info.min);
ttl_infos.updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
}
}

View File

@ -82,6 +82,7 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).") \
IM(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \
M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.") \
M(SettingBool, ttl_only_drop_parts, false, "Only drop altogether the expired parts and not partially prune them.") \
M(SettingBool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)") \
M(SettingBool, enable_mixed_granularity_parts, 0, "Enable parts with adaptive and non adaptive granularity") \
M(SettingMaxThreads, max_part_loading_threads, 0, "The number of theads to load data parts at startup.") \

View File

@ -20,9 +20,11 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select(
{
for (auto it = partitions[i].begin(); it != partitions[i].end(); ++it)
{
if (it->min_ttl && (partition_to_merge_index == -1 || it->min_ttl < partition_to_merge_min_ttl))
time_t ttl = only_drop_parts ? it->max_ttl : it->min_ttl;
if (ttl && (partition_to_merge_index == -1 || ttl < partition_to_merge_min_ttl))
{
partition_to_merge_min_ttl = it->min_ttl;
partition_to_merge_min_ttl = ttl;
partition_to_merge_index = i;
best_begin = it;
}
@ -38,7 +40,9 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select(
while (true)
{
if (!best_begin->min_ttl || best_begin->min_ttl > current_time
time_t ttl = only_drop_parts ? best_begin->max_ttl : best_begin->min_ttl;
if (!ttl || ttl > current_time
|| (max_total_size_to_merge && total_size > max_total_size_to_merge))
{
++best_begin;
@ -54,7 +58,9 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select(
while (best_end != best_partition.end())
{
if (!best_end->min_ttl || best_end->min_ttl > current_time
time_t ttl = only_drop_parts ? best_end->max_ttl : best_end->min_ttl;
if (!ttl || ttl > current_time
|| (max_total_size_to_merge && total_size > max_total_size_to_merge))
break;

View File

@ -14,13 +14,14 @@ namespace DB
class TTLMergeSelector : public IMergeSelector
{
public:
explicit TTLMergeSelector(time_t current_time_) : current_time(current_time_) {}
explicit TTLMergeSelector(time_t current_time_, bool only_drop_parts_) : current_time(current_time_), only_drop_parts(only_drop_parts_) {}
PartsInPartition select(
const Partitions & partitions,
const size_t max_total_size_to_merge) override;
private:
time_t current_time;
bool only_drop_parts;
};
}