CLICKHOUSE-3880: Fix replace partition logic inside replicated merge tree

This commit is contained in:
alesapin 2018-08-06 15:59:51 +03:00
parent b321963b35
commit 425918afe1
4 changed files with 31 additions and 9 deletions

View File

@ -666,8 +666,25 @@ ReplicatedMergeTreeQueue::StringSet ReplicatedMergeTreeQueue::moveSiblingPartsFo
return parts_for_merge;
}
bool ReplicatedMergeTreeQueue::checkReplaceRangeCanBeRemoved(const MergeTreePartInfo & part_info, const LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const
{
if (entry_ptr->type != LogEntry::REPLACE_RANGE)
return false;
void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper, const MergeTreePartInfo & part_info)
if (current.type != LogEntry::REPLACE_RANGE && current.type != LogEntry::DROP_RANGE)
return false;
if (entry_ptr->replace_range_entry != nullptr && entry_ptr->replace_range_entry == current.replace_range_entry) /// same partition, don't want to drop ourselves
return false;
for (const String & new_part_name : entry_ptr->replace_range_entry->new_part_names)
if (!part_info.contains(MergeTreePartInfo::fromPartName(new_part_name, format_version)))
return false;
return true;
}
void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper, const MergeTreePartInfo & part_info, const ReplicatedMergeTreeLogEntryData & current)
{
Queue to_wait;
size_t removed_entries = 0;
@ -680,8 +697,9 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(zkutil::ZooKeeperPt
{
auto type = (*it)->type;
if ((type == LogEntry::GET_PART || type == LogEntry::MERGE_PARTS || type == LogEntry::MUTATE_PART)
&& part_info.contains(MergeTreePartInfo::fromPartName((*it)->new_part_name, format_version)))
if (((type == LogEntry::GET_PART || type == LogEntry::MERGE_PARTS || type == LogEntry::MUTATE_PART)
&& part_info.contains(MergeTreePartInfo::fromPartName((*it)->new_part_name, format_version)))
|| checkReplaceRangeCanBeRemoved(part_info, *it, current))
{
if ((*it)->currently_executing)
to_wait.push_back(*it);

View File

@ -140,6 +140,8 @@ private:
/// Notify subscribers about queue change
void notifySubscribers(size_t new_queue_size);
/// Check that entry_ptr is REPLACE_RANGE entry and can be removed from queue due to current entry cover's it
bool checkReplaceRangeCanBeRemoved(const MergeTreePartInfo & part_info, const LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const;
/// Ensures that only one thread is simultaneously updating mutations.
std::mutex update_mutations_mutex;
@ -249,7 +251,7 @@ public:
/** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM).
* And also wait for the completion of their execution, if they are now being executed.
*/
void removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper, const MergeTreePartInfo & part_info);
void removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper, const MergeTreePartInfo & part_info, const ReplicatedMergeTreeLogEntryData & current);
/** Throws and exception if there are currently executing entries in the range .
*/

View File

@ -1619,7 +1619,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
{
auto drop_range_info = MergeTreePartInfo::fromPartName(entry.new_part_name, data.format_version);
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range_info);
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range_info, entry);
LOG_DEBUG(log, (entry.detach ? "Detaching" : "Removing") << " parts.");
@ -1728,7 +1728,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
/// Range with only one block has special meaning ATTACH PARTITION
bool replace = drop_range.getBlocksCount() > 1;
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range);
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range, entry);
struct PartDescription
{
@ -4477,7 +4477,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
}
/// We are almost ready to commit changes, remove fetches and merges from drop range
queue.removePartProducingOpsInRange(zookeeper, drop_range);
queue.removePartProducingOpsInRange(zookeeper, drop_range, entry);
/// Remove deduplication block_ids of replacing parts
if (replace)

View File

@ -104,6 +104,7 @@ def test_drop_failover(drop_failover):
time.sleep(1)
counter += 1
assert 'Not found part' not in node4.query("select last_exception from system.replication_queue where type = 'REPLACE_RANGE'")
assert node4.query("SELECT id FROM test_table order by id") == ''
node5 = cluster.add_instance('node5', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node6 = cluster.add_instance('node6', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
@ -154,8 +155,9 @@ def test_replace_after_replace_failover(replace_after_replace_failover):
# connection restored
counter = 0
while counter < 10: # will lasts forever
if 'Not found part' not in node4.query("select last_exception from system.replication_queue where type = 'REPLACE_RANGE'"):
if 'Not found part' not in node6.query("select last_exception from system.replication_queue where type = 'REPLACE_RANGE'"):
break
time.sleep(1)
counter += 1
assert 'Not found part' not in node4.query("select last_exception from system.replication_queue where type = 'REPLACE_RANGE'")
assert 'Not found part' not in node6.query("select last_exception from system.replication_queue where type = 'REPLACE_RANGE'")
assert node6.query("SELECT id FROM test_table order by id") == '333\n'