This commit is contained in:
Vitaly Baranov 2024-11-21 13:00:48 +01:00 committed by GitHub
commit 5289a95cef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 42 additions and 8 deletions

View File

@ -316,9 +316,16 @@ zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperAndAssertNotReadonl
/// There's a short period of time after connection loss when new session is created, /// There's a short period of time after connection loss when new session is created,
/// but replication queue is not reinitialized. We must ensure that table is not readonly anymore /// but replication queue is not reinitialized. We must ensure that table is not readonly anymore
/// before using new ZooKeeper session to write something (except maybe GET_PART) into replication log. /// before using new ZooKeeper session to write something (except maybe GET_PART) into replication log.
auto res = getZooKeeper(); return getZooKeeperIfNotReadonly(/* assert_if_readonly = */ true, /* assert_if_readonly_while_shutdown = */ true);
assertNotReadonly(); }
return res;
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperIfNotReadonly(bool assert_if_readonly, bool assert_if_readonly_while_shutdown) const
{
auto zookeeper = getZooKeeper();
bool readonly = checkIfReadonly(assert_if_readonly, assert_if_readonly_while_shutdown);
if (readonly)
return nullptr;
return zookeeper;
} }
String StorageReplicatedMergeTree::getEndpointName() const String StorageReplicatedMergeTree::getEndpointName() const
@ -3650,9 +3657,14 @@ void StorageReplicatedMergeTree::queueUpdatingTask()
last_queue_update_start_time.store(time(nullptr)); last_queue_update_start_time.store(time(nullptr));
queue_update_in_progress = true; queue_update_in_progress = true;
} }
try try
{ {
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), queue_updating_task->getWatchCallback(), ReplicatedMergeTreeQueue::UPDATE); auto zookeeper = getZooKeeperIfNotReadonly(/* assert_if_readonly = */ true, /* assert_if_readonly_while_shutdown = */ false);
if (!zookeeper)
return; /// readonly because shutdown() was called
queue.pullLogsToQueue(zookeeper, queue_updating_task->getWatchCallback(), ReplicatedMergeTreeQueue::UPDATE);
last_queue_update_finish_time.store(time(nullptr)); last_queue_update_finish_time.store(time(nullptr));
queue_update_in_progress = false; queue_update_in_progress = false;
} }
@ -3884,7 +3896,9 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
/// (OPTIMIZE queries) can assign new merges. /// (OPTIMIZE queries) can assign new merges.
std::lock_guard merge_selecting_lock(merge_selecting_mutex); std::lock_guard merge_selecting_lock(merge_selecting_mutex);
auto zookeeper = getZooKeeperAndAssertNotReadonly(); auto zookeeper = getZooKeeperIfNotReadonly(/* assert_if_readonly = */ true, /* assert_if_readonly_while_shutdown = */ false);
if (!zookeeper)
return AttemptStatus::CannotSelect; /// readonly because shutdown() was called
std::optional<ReplicatedMergeTreeMergePredicate> merge_pred; std::optional<ReplicatedMergeTreeMergePredicate> merge_pred;
@ -4038,7 +4052,11 @@ void StorageReplicatedMergeTree::mutationsFinalizingTask()
try try
{ {
needs_reschedule = queue.tryFinalizeMutations(getZooKeeperAndAssertNotReadonly()); auto zookeeper = getZooKeeperIfNotReadonly(/* assert_if_readonly = */ true, /* assert_if_readonly_while_shutdown = */ false);
if (!zookeeper)
return; /// readonly because shutdown() was called
needs_reschedule = queue.tryFinalizeMutations(zookeeper);
} }
catch (...) catch (...)
{ {
@ -5711,11 +5729,25 @@ std::optional<UInt64> StorageReplicatedMergeTree::totalBytesUncompressed(const S
} }
void StorageReplicatedMergeTree::assertNotReadonly() const void StorageReplicatedMergeTree::assertNotReadonly() const
{
checkIfReadonly(/* assert_if_readonly = */ true, /* assert_if_readonly_while_shutdown = */ true);
}
bool StorageReplicatedMergeTree::checkIfReadonly(bool assert_if_readonly, bool assert_if_readonly_while_shutdown) const
{ {
if (is_readonly) if (is_readonly)
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode (replica path: {})", replica_path); {
if (assert_if_readonly && (assert_if_readonly_while_shutdown || !(shutdown_called || partial_shutdown_called)))
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode (replica path: {})", replica_path);
return true; /// readonly
}
if (isStaticStorage()) if (isStaticStorage())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode due to static storage"); {
if (assert_if_readonly && (assert_if_readonly_while_shutdown || !(shutdown_called || partial_shutdown_called)))
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode due to static storage");
return true; /// readonly
}
return false; /// not readonly
} }

View File

@ -404,6 +404,7 @@ private:
/// for table. /// for table.
zkutil::ZooKeeperPtr getZooKeeperIfTableShutDown() const; zkutil::ZooKeeperPtr getZooKeeperIfTableShutDown() const;
zkutil::ZooKeeperPtr getZooKeeperAndAssertNotReadonly() const; zkutil::ZooKeeperPtr getZooKeeperAndAssertNotReadonly() const;
zkutil::ZooKeeperPtr getZooKeeperIfNotReadonly(bool assert_if_readonly, bool assert_if_readonly_while_shutdown) const;
void setZooKeeper(); void setZooKeeper();
String getEndpointName() const; String getEndpointName() const;
@ -841,6 +842,7 @@ private:
/// Throw an exception if the table is readonly. /// Throw an exception if the table is readonly.
void assertNotReadonly() const; void assertNotReadonly() const;
bool checkIfReadonly(bool assert_if_readonly, bool assert_if_readonly_while_shutdown) const;
/// Produce an imaginary part info covering all parts in the specified partition (at the call moment). /// Produce an imaginary part info covering all parts in the specified partition (at the call moment).
/// Returns false if the partition doesn't exist yet. /// Returns false if the partition doesn't exist yet.