Update ReplicatedMergeTreeQuorumAddedParts.h

This commit is contained in:
Vadim 2018-09-21 18:20:31 +03:00 committed by GitHub
parent 17aa0356e5
commit 8c413e3f1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -14,113 +14,113 @@ namespace DB
struct ReplicatedMergeTreeQuorumAddedParts struct ReplicatedMergeTreeQuorumAddedParts
{ {
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>; using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
using PartitionIdToPartName = std::unordered_map<String, String>; using PartitionIdToPartName = std::unordered_map<String, String>;
PartitionIdToPartName added_parts; PartitionIdToPartName added_parts;
MergeTreeDataFormatVersion format_version; MergeTreeDataFormatVersion format_version;
ReplicatedMergeTreeQuorumAddedParts(const std::string & added_parts_str, MergeTreeDataFormatVersion format_version_) ReplicatedMergeTreeQuorumAddedParts(const std::string & added_parts_str, MergeTreeDataFormatVersion format_version_)
: format_version(format_version_) : format_version(format_version_)
{ {
fromString(added_parts_str); fromString(added_parts_str);
} }
/// Write new parts in buffer with added parts. /// Write new parts in buffer with added parts.
void write(WriteBuffer & out) void write(WriteBuffer & out)
{ {
out << "version: " << 2 << '\n'; out << "version: " << 2 << '\n';
out << "parts count: " << added_parts.size() << '\n'; out << "parts count: " << added_parts.size() << '\n';
for (const auto & part : added_parts) for (const auto & part : added_parts)
out << part.first << '\t' << part.second << '\n'; out << part.first << '\t' << part.second << '\n';
} }
PartitionIdToMaxBlock getMaxInsertedBlocks() PartitionIdToMaxBlock getMaxInsertedBlocks()
{ {
PartitionIdToMaxBlock max_added_blocks; PartitionIdToMaxBlock max_added_blocks;
for (const auto & part : added_parts) for (const auto & part : added_parts)
{ {
auto partition_info = MergeTreePartInfo::fromPartName(part.second, format_version); auto partition_info = MergeTreePartInfo::fromPartName(part.second, format_version);
max_added_blocks[part.first] = partition_info.max_block; max_added_blocks[part.first] = partition_info.max_block;
} }
return max_added_blocks;
}
void read(ReadBuffer & in) return max_added_blocks;
{ }
if (checkString("version: ", in))
{
size_t version;
readText(version, in); void read(ReadBuffer & in)
assertChar('\n', in); {
if (checkString("version: ", in))
{
size_t version;
if (version == 2) readText(version, in);
added_parts = read_v2(in); assertChar('\n', in);
}
else
added_parts = read_v1(in);
}
/// Read added bloks when node in ZooKeeper supports only one partition. if (version == 2)
PartitionIdToPartName read_v1(ReadBuffer & in) added_parts = read_v2(in);
{ }
PartitionIdToPartName parts_in_quorum; else
added_parts = read_v1(in);
}
std::string partition_name; /// Read added bloks when node in ZooKeeper supports only one partition.
PartitionIdToPartName read_v1(ReadBuffer & in)
{
PartitionIdToPartName parts_in_quorum;
readText(partition_name, in); std::string partition_name;
auto partition_info = MergeTreePartInfo::fromPartName(partition_name, format_version); readText(partition_name, in);
auto partition_info = MergeTreePartInfo::fromPartName(partition_name, format_version);
parts_in_quorum[partition_info.partition_id] = partition_name; parts_in_quorum[partition_info.partition_id] = partition_name;
return parts_in_quorum; return parts_in_quorum;
} }
/// Read blocks when node in ZooKeeper suppors multiple partitions. /// Read blocks when node in ZooKeeper suppors multiple partitions.
PartitionIdToPartName read_v2(ReadBuffer & in) PartitionIdToPartName read_v2(ReadBuffer & in)
{ {
assertString("parts count: ", in); assertString("parts count: ", in);
PartitionIdToPartName parts_in_quorum; PartitionIdToPartName parts_in_quorum;
uint64_t parts_count; uint64_t parts_count;
readText(parts_count, in); readText(parts_count, in);
assertChar('\n', in); assertChar('\n', in);
for (uint64_t i = 0; i < parts_count; ++i)
{
std::string partition_id;
std::string part_name;
readText(partition_id, in); for (uint64_t i = 0; i < parts_count; ++i)
assertChar('\t', in); {
readText(part_name, in); std::string partition_id;
assertChar('\n', in); std::string part_name;
parts_in_quorum[partition_id] = part_name; readText(partition_id, in);
} assertChar('\t', in);
return parts_in_quorum; readText(part_name, in);
} assertChar('\n', in);
void fromString(const std::string & str) parts_in_quorum[partition_id] = part_name;
{ }
if (str.empty()) return parts_in_quorum;
return; }
ReadBufferFromString in(str);
read(in);
}
std::string toString() void fromString(const std::string & str)
{ {
WriteBufferFromOwnString out; if (str.empty())
write(out); return;
return out.str(); ReadBufferFromString in(str);
} read(in);
}
std::string toString()
{
WriteBufferFromOwnString out;
write(out);
return out.str();
}
}; };