Write with recompression TTL

This commit is contained in:
alesapin 2020-08-31 16:29:31 +03:00
parent 42c210fcba
commit adc18f4d3f
2 changed files with 38 additions and 5 deletions

View File

@ -67,6 +67,18 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
moves_ttl.emplace(expression, ttl_info);
}
}
if (json.has("recompression"))
{
const JSON & moves = json["recompression"];
for (auto move : moves) // NOLINT
{
MergeTreeDataPartTTLInfo ttl_info;
ttl_info.min = move["min"].getUInt();
ttl_info.max = move["max"].getUInt();
String expression = move["expression"].getString();
recompression_ttl.emplace(expression, ttl_info);
}
}
}
@ -122,6 +134,28 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const
}
writeString("]", out);
}
if (!recompression_ttl.empty())
{
if (!moves_ttl.empty() || !columns_ttl.empty() || table_ttl.min)
writeString(",", out);
writeString(R"("recompression":[)", out);
for (auto it = recompression_ttl.begin(); it != recompression_ttl.end(); ++it)
{
if (it != recompression_ttl.begin())
writeString(",", out);
writeString(R"({"expression":)", out);
writeString(doubleQuoteString(it->first), out);
writeString(R"(,"min":)", out);
writeIntText(it->second.min, out);
writeString(R"(,"max":)", out);
writeIntText(it->second.max, out);
writeString("}", out);
}
writeString("]", out);
}
writeString("}", out);
}

View File

@ -234,11 +234,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
for (const auto & ttl_entry : move_ttl_entries)
updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
const auto & recompression_ttl_entries = metadata_snapshot->getRecompressionTTLs();
for (const auto & ttl_entry : recompression_ttl_entries)
updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr));
VolumePtr volume = data.getStoragePolicy()->getVolume(0);
@ -303,6 +298,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
for (const auto & [name, ttl_entry] : metadata_snapshot->getColumnTTLs())
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block, true);
const auto & recompression_ttl_entries = metadata_snapshot->getRecompressionTTLs();
for (const auto & ttl_entry : recompression_ttl_entries)
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.recompression_ttl[ttl_entry.result_column], block, false);
new_data_part->ttl_infos.update(move_ttl_infos);
/// This effectively chooses minimal compression method: