fix zookeeper format version 4

This commit is contained in:
Yuri Dyachenko 2017-04-17 18:14:56 +03:00 committed by alexey-milovidov
parent 5a8344382b
commit 8fb2b7ee3d
3 changed files with 9 additions and 9 deletions

View File

@ -27,8 +27,7 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
for (const String & s : parts_to_merge)
out << s << '\n';
out << "into\n" << new_part_name;
if (deduplicate)
out << "deduplicate\n";
out << "\ndeduplicate: " << deduplicate;
break;
case DROP_RANGE:
@ -101,10 +100,8 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
parts_to_merge.push_back(s);
}
in >> new_part_name;
std::string flag;
in >> flag;
if (flag == "deduplicate")
deduplicate = true;
if (format_version >= 4)
in >> "\ndeduplicate: " >> deduplicate;
}
else if (type_str == "drop" || type_str == "detach")
{

View File

@ -1758,6 +1758,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
setThreadName("ReplMTMergeSel");
LOG_DEBUG(log, "Merge selecting thread started");
bool deduplicate = false; /// TODO: read deduplicate option from table config
bool need_pull = true;
MemoizedPartsThatCouldBeMerged memoized_parts_that_could_be_merged;
@ -1808,7 +1809,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
parts, merged_name, false,
max_parts_size_for_merge,
can_merge)
&& createLogEntryToMergeParts(parts, merged_name))
&& createLogEntryToMergeParts(parts, merged_name, deduplicate))
{
success = true;
need_pull = true;
@ -1832,7 +1833,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
const MergeTreeData::DataPartsVector & parts, const String & merged_name, ReplicatedMergeTreeLogEntryData * out_log_entry)
const MergeTreeData::DataPartsVector & parts, const String & merged_name, bool deduplicate, ReplicatedMergeTreeLogEntryData * out_log_entry)
{
auto zookeeper = getZooKeeper();
@ -1861,6 +1862,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
entry.type = LogEntry::MERGE_PARTS;
entry.source_replica = replica_name;
entry.new_part_name = merged_name;
entry.deduplicate = deduplicate;
entry.create_time = time(0);
for (const auto & part : parts)
@ -2496,7 +2498,7 @@ bool StorageReplicatedMergeTree::optimize(const String & partition, bool final,
if (!selected)
return false;
if (!createLogEntryToMergeParts(parts, merged_name, &merge_entry))
if (!createLogEntryToMergeParts(parts, merged_name, deduplicate, &merge_entry))
return false;
}

View File

@ -425,6 +425,7 @@ private:
bool createLogEntryToMergeParts(
const MergeTreeData::DataPartsVector & parts,
const String & merged_name,
bool deduplicate,
ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr);
/// Exchange parts.