fix addedparts

This commit is contained in:
VadimPE 2018-10-18 14:51:40 +03:00
parent 070eae2ae6
commit 4a93d3b836
2 changed files with 26 additions and 24 deletions

View File

@ -21,11 +21,9 @@ struct ReplicatedMergeTreeQuorumAddedParts
MergeTreeDataFormatVersion format_version;
ReplicatedMergeTreeQuorumAddedParts(const std::string & added_parts_str, MergeTreeDataFormatVersion format_version_)
ReplicatedMergeTreeQuorumAddedParts(const MergeTreeDataFormatVersion format_version_)
: format_version(format_version_)
{
fromString(added_parts_str);
}
{}
/// Write new parts in buffer with added parts.
void write(WriteBuffer & out)
@ -43,8 +41,8 @@ struct ReplicatedMergeTreeQuorumAddedParts
for (const auto & part : added_parts)
{
auto partition_info = MergeTreePartInfo::fromPartName(part.second, format_version);
max_added_blocks[part.first] = partition_info.max_block;
auto part_info = MergeTreePartInfo::fromPartName(part.second, format_version);
max_added_blocks[part.first] = part_info.max_block;
}
return max_added_blocks;
@ -71,12 +69,12 @@ struct ReplicatedMergeTreeQuorumAddedParts
{
PartitionIdToPartName parts_in_quorum;
std::string partition_name;
std::string part_name;
readText(partition_name, in);
readText(part_name, in);
auto partition_info = MergeTreePartInfo::fromPartName(partition_name, format_version);
parts_in_quorum[partition_info.partition_id] = partition_name;
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
parts_in_quorum[part_info.partition_id] = part_name;
return parts_in_quorum;
}
@ -109,8 +107,6 @@ struct ReplicatedMergeTreeQuorumAddedParts
void fromString(const std::string & str)
{
if (str.empty())
return;
ReadBufferFromString in(str);
read(in);
}

View File

@ -2669,7 +2669,10 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
Coordination::Stat added_parts_stat;
String old_added_parts = zookeeper->get(quorum_last_part_path, &added_parts_stat);
ReplicatedMergeTreeQuorumAddedParts parts_with_quorum(old_added_parts, data.format_version);
ReplicatedMergeTreeQuorumAddedParts parts_with_quorum(data.format_version);
if (!old_added_parts.empty())
parts_with_quorum.fromString(old_added_parts);
auto partition_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
parts_with_quorum.added_parts[partition_info.partition_id] = part_name;
@ -2979,11 +2982,13 @@ BlockInputStreams StorageReplicatedMergeTree::read(
}
String added_parts_str;
zookeeper->tryGet(zookeeper_path + "/quorum/last_part", added_parts_str);
if (zookeeper->tryGet(zookeeper_path + "/quorum/last_part", added_parts_str))
{
if (!added_parts_str.empty())
{
ReplicatedMergeTreeQuorumAddedParts part_with_quorum(added_parts_str, data.format_version);
ReplicatedMergeTreeQuorumAddedParts part_with_quorum(data.format_version);
part_with_quorum.fromString(added_parts_str);
auto added_parts = part_with_quorum.added_parts;
for (const auto & added_part : added_parts)
@ -2994,6 +2999,7 @@ BlockInputStreams StorageReplicatedMergeTree::read(
for (const auto & max_block : part_with_quorum.getMaxInsertedBlocks())
max_added_blocks[max_block.first] = max_block.second;
}
}
return reader.read(column_names, query_info, context, max_block_size, num_streams, &max_added_blocks);
}