Merge pull request #27808 from ClickHouse/fix_replicas_may_diverge

Fix a couple of bugs that may cause replicas to diverge
This commit is contained in:
alesapin 2021-08-19 09:58:07 +03:00 committed by GitHub
commit 0aec151719
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 81 additions and 32 deletions

View File

@ -145,6 +145,7 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP
if (found_part_with_the_same_min_block && found_part_with_the_same_max_block)
{
/// FIXME It may never appear
LOG_WARNING(log, "Found parts with the same min block and with the same max block as the missing part {}. Hoping that it will eventually appear as a result of a merge.", part_name);
return MissingPartSearchResult::FoundAndDontNeedFetch;
}

View File

@ -23,6 +23,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int UNEXPECTED_NODE_IN_ZOOKEEPER;
extern const int ABORTED;
extern const int READONLY;
}
@ -472,9 +473,15 @@ bool ReplicatedMergeTreeQueue::removeFailedQuorumPart(const MergeTreePartInfo &
return virtual_parts.remove(part_info);
}
int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback)
int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback, PullLogsReason reason)
{
std::lock_guard lock(pull_logs_to_queue_mutex);
if (storage.is_readonly && reason == SYNC)
{
throw Exception(ErrorCodes::READONLY, "Cannot SYNC REPLICA, because replica is readonly");
/// TODO throw logical error for other reasons (except LOAD)
}
if (pull_log_blocker.isCancelled())
throw Exception("Log pulling is cancelled", ErrorCodes::ABORTED);
@ -714,13 +721,22 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
std::vector<std::future<Coordination::GetResponse>> futures;
for (const String & entry : entries_to_load)
futures.emplace_back(zookeeper->asyncGet(fs::path(zookeeper_path) / "mutations" / entry));
futures.emplace_back(zookeeper->asyncTryGet(fs::path(zookeeper_path) / "mutations" / entry));
std::vector<ReplicatedMergeTreeMutationEntryPtr> new_mutations;
for (size_t i = 0; i < entries_to_load.size(); ++i)
{
auto maybe_response = futures[i].get();
if (maybe_response.error != Coordination::Error::ZOK)
{
assert(maybe_response.error == Coordination::Error::ZNONODE);
/// It's ok if it happened on server startup or table creation and replica loads all mutation entries.
/// It's also ok if mutation was killed.
LOG_WARNING(log, "Cannot get mutation node {} ({}), probably it was concurrently removed", entries_to_load[i], maybe_response.error);
continue;
}
new_mutations.push_back(std::make_shared<ReplicatedMergeTreeMutationEntry>(
ReplicatedMergeTreeMutationEntry::parse(futures[i].get().data, entries_to_load[i])));
ReplicatedMergeTreeMutationEntry::parse(maybe_response.data, entries_to_load[i])));
}
bool some_mutations_are_probably_done = false;
@ -1504,6 +1520,9 @@ MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
/// to allow recovering from a mutation that cannot be executed. This way you can delete the mutation entry
/// from /mutations in ZK and the replicas will simply skip the mutation.
/// NOTE: However, it's quite dangerous to skip MUTATE_PART. Replicas may diverge if one of them have executed part mutation,
/// and then mutation was killed before execution of MUTATE_PART on remaining replicas.
if (part->info.getDataVersion() > desired_mutation_version)
{
LOG_WARNING(log, "Data version of part {} is already greater than desired mutation version {}", part->name, desired_mutation_version);
@ -1831,7 +1850,7 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
}
}
merges_version = queue_.pullLogsToQueue(zookeeper);
merges_version = queue_.pullLogsToQueue(zookeeper, {}, ReplicatedMergeTreeQueue::MERGE_PREDICATE);
{
/// We avoid returning here a version to be used in a lightweight transaction.

View File

@ -294,13 +294,22 @@ public:
bool removeFailedQuorumPart(const MergeTreePartInfo & part_info);
enum PullLogsReason
{
LOAD,
UPDATE,
MERGE_PREDICATE,
SYNC,
OTHER,
};
/** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value.
* If watch_callback is not empty, will call it when new entries appear in the log.
* If there were new entries, notifies storage.queue_task_handle.
* Additionally loads mutations (so that the set of mutations is always more recent than the queue).
* Return the version of "logs" node (that is updated for every merge/mutation/... added to the log)
*/
int32_t pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {});
int32_t pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {}, PullLogsReason reason = OTHER);
/// Load new mutation entries. If something new is loaded, schedule storage.merge_selecting_task.
/// If watch_callback is not empty, will call it when new mutations appear in ZK.

View File

@ -25,6 +25,8 @@ namespace DB
namespace ErrorCodes
{
extern const int REPLICA_IS_ALREADY_ACTIVE;
extern const int REPLICA_STATUS_CHANGED;
}
namespace
@ -55,6 +57,7 @@ void ReplicatedMergeTreeRestartingThread::run()
if (need_stop)
return;
bool reschedule_now = false;
try
{
if (first_time || readonly_mode_was_set || storage.getZooKeeper()->expired())
@ -131,15 +134,29 @@ void ReplicatedMergeTreeRestartingThread::run()
first_time = false;
}
}
catch (...)
catch (const Exception & e)
{
/// We couldn't activate table let's set it into readonly mode
setReadonly();
partialShutdown();
storage.startup_event.set();
tryLogCurrentException(log, __PRETTY_FUNCTION__);
if (e.code() == ErrorCodes::REPLICA_STATUS_CHANGED)
reschedule_now = true;
}
catch (...)
{
setReadonly();
partialShutdown();
storage.startup_event.set();
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
task->scheduleAfter(check_period_ms);
if (reschedule_now)
task->schedule();
else
task->scheduleAfter(check_period_ms);
}
@ -159,7 +176,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
/// pullLogsToQueue() after we mark replica 'is_active' (and after we repair if it was lost);
/// because cleanup_thread doesn't delete log_pointer of active replicas.
storage.queue.pullLogsToQueue(zookeeper);
storage.queue.pullLogsToQueue(zookeeper, {}, ReplicatedMergeTreeQueue::LOAD);
storage.queue.removeCurrentPartsFromMutations();
storage.last_queue_update_finish_time.store(time(nullptr));

View File

@ -141,6 +141,7 @@ namespace ErrorCodes
extern const int DUPLICATE_DATA_PART;
extern const int BAD_ARGUMENTS;
extern const int CONCURRENT_ACCESS_NOT_SUPPORTED;
extern const int CHECKSUM_DOESNT_MATCH;
}
namespace ActionLocks
@ -1314,32 +1315,35 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
}
ReplicatedMergeTreePartHeader replica_part_header;
if (!part_zk_str.empty())
replica_part_header = ReplicatedMergeTreePartHeader::fromString(part_zk_str);
else
if (part_zk_str.empty())
{
Coordination::Stat columns_stat_before, columns_stat_after;
String columns_str;
String checksums_str;
/// Let's check that the node's version with the columns did not change while we were reading the checksums.
/// This ensures that the columns and the checksum refer to the same
if (!zookeeper->tryGet(fs::path(current_part_path) / "columns", columns_str, &columns_stat_before) ||
!zookeeper->tryGet(fs::path(current_part_path) / "checksums", checksums_str) ||
!zookeeper->exists(fs::path(current_part_path) / "columns", &columns_stat_after) ||
columns_stat_before.version != columns_stat_after.version)
if (zookeeper->tryGet(fs::path(current_part_path) / "columns", columns_str) &&
zookeeper->tryGet(fs::path(current_part_path) / "checksums", checksums_str))
{
LOG_INFO(log, "Not checking checksums of part {} with replica {} because part changed while we were reading its checksums", part_name, replica);
replica_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes(columns_str, checksums_str);
}
else
{
if (zookeeper->exists(current_part_path))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} has empty header and does not have columns and checksums. "
"Looks like a bug.", current_part_path);
LOG_INFO(log, "Not checking checksums of part {} with replica {} because part was removed from ZooKeeper", part_name, replica);
continue;
}
replica_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes(
columns_str, checksums_str);
}
else
{
replica_part_header = ReplicatedMergeTreePartHeader::fromString(part_zk_str);
}
if (replica_part_header.getColumnsHash() != local_part_header.getColumnsHash())
{
LOG_INFO(log, "Not checking checksums of part {} with replica {} because columns are different", part_name, replica);
continue;
/// Either it's a bug or ZooKeeper contains broken data.
/// TODO Fix KILL MUTATION and replace CHECKSUM_DOESNT_MATCH with LOGICAL_ERROR
/// (some replicas may skip killed mutation even if it was executed on other replicas)
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH, "Part {} from {} has different columns hash", part_name, replica);
}
replica_part_header.getChecksums().checkEqual(local_part_header.getChecksums(), true);
@ -2137,6 +2141,8 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
if (!parts_for_merge.empty() && replica.empty())
{
LOG_INFO(log, "No active replica has part {}. Will fetch merged part instead.", entry.new_part_name);
/// We should enqueue it for check, because merged part may never appear if source part is lost
enqueuePartForCheck(entry.new_part_name);
return false;
}
@ -3083,7 +3089,7 @@ void StorageReplicatedMergeTree::queueUpdatingTask()
}
try
{
queue.pullLogsToQueue(getZooKeeper(), queue_updating_task->getWatchCallback());
queue.pullLogsToQueue(getZooKeeper(), queue_updating_task->getWatchCallback(), ReplicatedMergeTreeQueue::UPDATE);
last_queue_update_finish_time.store(time(nullptr));
queue_update_in_progress = false;
}
@ -4321,9 +4327,6 @@ void StorageReplicatedMergeTree::startup()
/// Wait while restarting_thread initializes LeaderElection (and so on) or makes first attempt to do it
startup_event.wait();
/// If we don't separate create/start steps, race condition will happen
/// between the assignment of queue_task_handle and queueTask that use the queue_task_handle.
background_executor.start();
startBackgroundMovesIfNeeded();
part_moves_between_shards_orchestrator.start();
@ -5460,9 +5463,9 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry(
const auto & stop_waiting = [&]()
{
bool stop_waiting_itself = waiting_itself && (partial_shutdown_called || is_dropped);
bool stop_waiting_itself = waiting_itself && partial_shutdown_called;
bool stop_waiting_non_active = !wait_for_non_active && !getZooKeeper()->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active");
return stop_waiting_itself || stop_waiting_non_active;
return is_dropped || stop_waiting_itself || stop_waiting_non_active;
};
/// Don't recheck ZooKeeper too often
@ -6058,7 +6061,7 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
LOG_TRACE(log, "Killing mutation {}", mutation_id);
LOG_INFO(log, "Killing mutation {}", mutation_id);
auto mutation_entry = queue.removeMutation(zookeeper, mutation_id);
if (!mutation_entry)
@ -6964,7 +6967,7 @@ bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UI
Stopwatch watch;
/// Let's fetch new log entries firstly
queue.pullLogsToQueue(getZooKeeper());
queue.pullLogsToQueue(getZooKeeper(), {}, ReplicatedMergeTreeQueue::SYNC);
/// This is significant, because the execution of this task could be delayed at BackgroundPool.
/// And we force it to be executed.