Merge pull request #48550 from ClickHouse/fix_sync_replica

Fix some tests
This commit is contained in:
Alexander Tokmakov 2023-04-08 14:41:10 +03:00 committed by GitHub
commit bf20e6bfc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 29 deletions

View File

@ -34,6 +34,8 @@
#include <Interpreters/MergeTreeTransaction.h>
#include <Interpreters/TransactionLog.h>
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
namespace CurrentMetrics
{
@ -1525,6 +1527,10 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const
size_t file_size = getDataPartStorage().getFileSize(TXN_VERSION_METADATA_FILE_NAME);
auto buf = getDataPartStorage().readFile(TXN_VERSION_METADATA_FILE_NAME, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt);
/// FIXME https://github.com/ClickHouse/ClickHouse/issues/48465
if (dynamic_cast<CachedOnDiskReadBufferFromFile *>(buf.get()))
return true;
readStringUntilEOF(content, *buf);
ReadBufferFromString str_buf{content};
VersionMetadata file;

View File

@ -1147,7 +1147,8 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
*it, /* is_successful = */ false,
min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
(*it)->removed_by_other_entry = true;
LogEntryPtr removing_entry = std::move(*it); /// Make it live a bit longer
removing_entry->removed_by_other_entry = true;
it = queue.erase(it);
notifySubscribers(queue.size(), &znode_name);
++removed_entries;
@ -2491,6 +2492,7 @@ ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCall
|| std::find(lightweight_entries.begin(), lightweight_entries.end(), entry->type) != lightweight_entries.end())
out_entry_names.insert(entry->znode_name);
}
LOG_TEST(log, "Waiting for {} entries to be processed: {}", out_entry_names.size(), fmt::join(out_entry_names, ", "));
}
auto it = subscribers.emplace(subscribers.end(), std::move(callback));

View File

@ -2209,8 +2209,17 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
/// Check that we could cover whole range
for (PartDescriptionPtr & part_desc : parts_to_add)
{
if (adding_parts_active_set.getContainingPart(part_desc->new_part_info).empty())
if (!adding_parts_active_set.getContainingPart(part_desc->new_part_info).empty())
continue;
MergeTreePartInfo covering_drop_range;
if (queue.isGoingToBeDropped(part_desc->new_part_info, &covering_drop_range))
{
LOG_WARNING(log, "Will not add part {} (while replacing {}) because it's going to be dropped (DROP_RANGE: {})",
part_desc->new_part_name, entry_replace.drop_range_part_name, covering_drop_range.getPartNameForLogs());
continue;
}
/// We should enqueue missing part for check, so it will be replaced with empty one (if needed)
/// and we will be able to execute this REPLACE_RANGE.
/// However, it's quite dangerous, because part may appear in source table.
@ -2238,7 +2247,6 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
"Not found part {} (or part covering it) neither source table neither remote replicas",
part_desc->new_part_name);
}
}
/// Filter covered parts
PartDescriptions final_parts;
@ -7616,7 +7624,6 @@ bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_millisec
if (removed_log_entry_id)
wait_for_ids.erase(*removed_log_entry_id);
chassert(new_queue_size || wait_for_ids.empty());
if (wait_for_ids.empty())
target_entry_event.set();
};