diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 85420cabb8d..5ed4987a6d5 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -34,6 +34,8 @@ #include #include +#include + namespace CurrentMetrics { @@ -1525,6 +1527,10 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const size_t file_size = getDataPartStorage().getFileSize(TXN_VERSION_METADATA_FILE_NAME); auto buf = getDataPartStorage().readFile(TXN_VERSION_METADATA_FILE_NAME, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt); + /// FIXME https://github.com/ClickHouse/ClickHouse/issues/48465 + if (dynamic_cast(buf.get())) + return true; + readStringUntilEOF(content, *buf); ReadBufferFromString str_buf{content}; VersionMetadata file; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index c8ce55f9600..623210ae04c 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1147,7 +1147,8 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange( *it, /* is_successful = */ false, min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock); - (*it)->removed_by_other_entry = true; + LogEntryPtr removing_entry = std::move(*it); /// Make it live a bit longer + removing_entry->removed_by_other_entry = true; it = queue.erase(it); notifySubscribers(queue.size(), &znode_name); ++removed_entries; @@ -2491,6 +2492,7 @@ ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCall || std::find(lightweight_entries.begin(), lightweight_entries.end(), entry->type) != lightweight_entries.end()) out_entry_names.insert(entry->znode_name); } + LOG_TEST(log, "Waiting for {} entries to be processed: {}", out_entry_names.size(), fmt::join(out_entry_names, ", ")); } auto it = subscribers.emplace(subscribers.end(), std::move(callback)); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index cbfe3f8cab2..e538b4fbe6c 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2209,35 +2209,43 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) /// Check that we could cover whole range for (PartDescriptionPtr & part_desc : parts_to_add) { - if (adding_parts_active_set.getContainingPart(part_desc->new_part_info).empty()) + if (!adding_parts_active_set.getContainingPart(part_desc->new_part_info).empty()) + continue; + + MergeTreePartInfo covering_drop_range; + if (queue.isGoingToBeDropped(part_desc->new_part_info, &covering_drop_range)) { - /// We should enqueue missing part for check, so it will be replaced with empty one (if needed) - /// and we will be able to execute this REPLACE_RANGE. - /// However, it's quite dangerous, because part may appear in source table. - /// So we enqueue it for check only if no replicas of source table have part either. - bool need_check = true; - if (auto * replicated_src_table = typeid_cast(source_table.get())) - { - String src_replica = replicated_src_table->findReplicaHavingPart(part_desc->src_part_name, false); - if (!src_replica.empty()) - { - LOG_DEBUG(log, "Found part {} on replica {} of source table, will not check part {} required for {}", - part_desc->src_part_name, src_replica, part_desc->new_part_name, entry.znode_name); - need_check = false; - } - } - - if (need_check) - { - LOG_DEBUG(log, "Will check part {} required for {}, because no replicas have it (including replicas of source table)", - part_desc->new_part_name, entry.znode_name); - enqueuePartForCheck(part_desc->new_part_name); - } - - throw Exception(ErrorCodes::NO_REPLICA_HAS_PART, - "Not found part {} (or part covering it) neither source table neither remote replicas", - part_desc->new_part_name); + LOG_WARNING(log, "Will not add part {} (while replacing {}) because it's going to be dropped (DROP_RANGE: {})", + part_desc->new_part_name, entry_replace.drop_range_part_name, covering_drop_range.getPartNameForLogs()); + continue; } + + /// We should enqueue missing part for check, so it will be replaced with empty one (if needed) + /// and we will be able to execute this REPLACE_RANGE. + /// However, it's quite dangerous, because part may appear in source table. + /// So we enqueue it for check only if no replicas of source table have part either. + bool need_check = true; + if (auto * replicated_src_table = typeid_cast(source_table.get())) + { + String src_replica = replicated_src_table->findReplicaHavingPart(part_desc->src_part_name, false); + if (!src_replica.empty()) + { + LOG_DEBUG(log, "Found part {} on replica {} of source table, will not check part {} required for {}", + part_desc->src_part_name, src_replica, part_desc->new_part_name, entry.znode_name); + need_check = false; + } + } + + if (need_check) + { + LOG_DEBUG(log, "Will check part {} required for {}, because no replicas have it (including replicas of source table)", + part_desc->new_part_name, entry.znode_name); + enqueuePartForCheck(part_desc->new_part_name); + } + + throw Exception(ErrorCodes::NO_REPLICA_HAS_PART, + "Not found part {} (or part covering it) neither source table neither remote replicas", + part_desc->new_part_name); } /// Filter covered parts @@ -7616,7 +7624,6 @@ bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_millisec if (removed_log_entry_id) wait_for_ids.erase(*removed_log_entry_id); - chassert(new_queue_size || wait_for_ids.empty()); if (wait_for_ids.empty()) target_entry_event.set(); };