better replication with compact parts

This commit is contained in:
CurtizJ 2020-02-13 17:19:43 +03:00
parent 59c4f53fec
commit 64e1883c06
4 changed files with 44 additions and 19 deletions

View File

@ -69,8 +69,24 @@ static const double DISK_USAGE_COEFFICIENT_TO_SELECT = 2;
/// because between selecting parts to merge and doing merge, amount of free space could have decreased.
static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.1;
void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_)
{
if (parts_.empty())
return;
size_t sum_rows = 0;
size_t sum_bytes_uncompressed = 0;
for (const auto & part : parts_)
{
sum_rows += part->rows_count;
sum_bytes_uncompressed += part->getTotalColumnsSize().data_uncompressed;
}
auto future_part_type = parts_.front()->storage.choosePartType(sum_bytes_uncompressed, sum_rows);
assign(std::move(parts_), future_part_type);
}
void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_, MergeTreeDataPartType future_part_type)
{
if (parts_.empty())
return;
@ -89,25 +105,20 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_)
UInt32 max_level = 0;
Int64 max_mutation = 0;
size_t sum_rows = 0;
size_t sum_bytes_uncompressed = 0;
for (const auto & part : parts)
{
max_level = std::max(max_level, part->info.level);
max_mutation = std::max(max_mutation, part->info.mutation);
sum_rows += part->rows_count;
sum_bytes_uncompressed += part->getTotalColumnsSize().data_uncompressed;
}
const auto & storage = parts.front()->storage;
type = storage.choosePartType(sum_bytes_uncompressed, sum_rows);
type = future_part_type;
part_info.partition_id = parts.front()->info.partition_id;
part_info.min_block = parts.front()->info.min_block;
part_info.max_block = parts.back()->info.max_block;
part_info.level = max_level + 1;
part_info.mutation = max_mutation;
if (storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
if (parts.front()->storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
DayNum min_date = DayNum(std::numeric_limits<UInt16>::max());
DayNum max_date = DayNum(std::numeric_limits<UInt16>::min());
@ -562,7 +573,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from "
<< parts.front()->name << " to " << parts.back()->name
<< " into " << TMP_PREFIX + future_part.name);
<< " into " << TMP_PREFIX + future_part.name + " with type " + future_part.type.toString());
String part_path = data.getFullPathOnDisk(space_reservation->getDisk());
String new_part_tmp_path = part_path + TMP_PREFIX + future_part.name + "/";

View File

@ -25,12 +25,20 @@ struct FutureMergedMutatedPart
const MergeTreePartition & getPartition() const { return parts.front()->partition; }
FutureMergedMutatedPart() = default;
explicit FutureMergedMutatedPart(MergeTreeData::DataPartsVector parts_)
{
assign(std::move(parts_));
}
FutureMergedMutatedPart(MergeTreeData::DataPartsVector parts_, MergeTreeDataPartType future_part_type)
{
assign(std::move(parts_), future_part_type);
}
void assign(MergeTreeData::DataPartsVector parts_);
void assign(MergeTreeData::DataPartsVector parts_, MergeTreeDataPartType future_part_type);
void updatePath(const MergeTreeData & storage, const ReservationPtr & reservation);
};

View File

@ -4,6 +4,7 @@
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h>
namespace DB
@ -12,7 +13,7 @@ namespace DB
void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
{
out << "format version: 5\n"
out << "format version: 4\n"
<< "create_time: " << LocalDateTime(create_time ? create_time : time(nullptr)) << "\n"
<< "source replica: " << source_replica << '\n'
<< "block_id: " << escape << block_id << '\n';
@ -29,7 +30,6 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
out << s << '\n';
out << "into\n" << new_part_name;
out << "\ndeduplicate: " << deduplicate;
out << "\npart_type: " << new_part_type.toString();
break;
case DROP_RANGE:
@ -72,6 +72,9 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
out << '\n';
if (new_part_type != MergeTreeDataPartType::WIDE && new_part_type != MergeTreeDataPartType::UNKNOWN)
out << "part_type: " << new_part_type.toString() << "\n";
if (quorum)
out << "quorum: " << quorum << '\n';
}
@ -83,7 +86,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
in >> "format version: " >> format_version >> "\n";
if (format_version < 1 || format_version > 5)
if (format_version < 1 || format_version > 4)
throw Exception("Unknown ReplicatedMergeTreeLogEntry format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT_VERSION);
if (format_version >= 2)
@ -121,12 +124,6 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
in >> new_part_name;
if (format_version >= 4)
in >> "\ndeduplicate: " >> deduplicate;
if (format_version >= 5)
{
String part_type_str;
in >> "\npart_type: " >> type_str;
new_part_type.fromString(type_str);
}
}
else if (type_str == "drop" || type_str == "detach")
{
@ -161,6 +158,15 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
}
in >> "\n";
if (checkString("part_type: ", in))
{
String part_type_str;
in >> type_str;
new_part_type.fromString(type_str);
in >> "\n";
}
else
new_part_type = MergeTreeDataPartType::WIDE;
/// Optional field.
if (!in.eof())

View File

@ -1076,7 +1076,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
FutureMergedMutatedPart future_merged_part(parts);
FutureMergedMutatedPart future_merged_part(parts, entry.new_part_type);
if (future_merged_part.name != entry.new_part_name)
{
throw Exception("Future merged part name " + backQuote(future_merged_part.name) + " differs from part name in log entry: "