mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 07:01:59 +00:00
Merge pull request #35946 from ClickHouse/faster_replicated_ddl
Make some replicated DDL queries faster
This commit is contained in:
commit
58db8cd9b3
@ -701,24 +701,34 @@ void ZooKeeper::removeChildrenRecursive(const std::string & path, const String &
|
||||
}
|
||||
}
|
||||
|
||||
void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, const String & keep_child_node)
|
||||
bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probably_flat, const String & keep_child_node)
|
||||
{
|
||||
Strings children;
|
||||
if (tryGetChildren(path, children) != Coordination::Error::ZOK)
|
||||
return;
|
||||
return false;
|
||||
|
||||
bool removed_as_expected = true;
|
||||
while (!children.empty())
|
||||
{
|
||||
Coordination::Requests ops;
|
||||
Strings batch;
|
||||
ops.reserve(MULTI_BATCH_SIZE);
|
||||
batch.reserve(MULTI_BATCH_SIZE);
|
||||
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
|
||||
{
|
||||
String child_path = fs::path(path) / children.back();
|
||||
tryRemoveChildrenRecursive(child_path);
|
||||
|
||||
/// Will try to avoid recursive getChildren calls if child_path probably has no children.
|
||||
/// It may be extremely slow when path contain a lot of leaf children.
|
||||
if (!probably_flat)
|
||||
tryRemoveChildrenRecursive(child_path);
|
||||
|
||||
if (likely(keep_child_node.empty() || keep_child_node != children.back()))
|
||||
{
|
||||
batch.push_back(child_path);
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(child_path, -1));
|
||||
}
|
||||
|
||||
children.pop_back();
|
||||
}
|
||||
|
||||
@ -726,10 +736,39 @@ void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, const Strin
|
||||
/// this means someone is concurrently removing these children and we will have
|
||||
/// to remove them one by one.
|
||||
Coordination::Responses responses;
|
||||
if (tryMulti(ops, responses) != Coordination::Error::ZOK)
|
||||
for (const std::string & child : batch)
|
||||
tryRemove(child);
|
||||
if (tryMulti(ops, responses) == Coordination::Error::ZOK)
|
||||
continue;
|
||||
|
||||
removed_as_expected = false;
|
||||
|
||||
std::vector<zkutil::ZooKeeper::FutureRemove> futures;
|
||||
futures.reserve(batch.size());
|
||||
for (const std::string & child : batch)
|
||||
futures.push_back(asyncTryRemoveNoThrow(child, -1));
|
||||
|
||||
for (size_t i = 0; i < batch.size(); ++i)
|
||||
{
|
||||
auto res = futures[i].get();
|
||||
if (res.error == Coordination::Error::ZOK)
|
||||
continue;
|
||||
if (res.error == Coordination::Error::ZNONODE)
|
||||
continue;
|
||||
|
||||
if (res.error == Coordination::Error::ZNOTEMPTY)
|
||||
{
|
||||
if (probably_flat)
|
||||
{
|
||||
/// It actually has children, let's remove them
|
||||
tryRemoveChildrenRecursive(batch[i]);
|
||||
tryRemove(batch[i]);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
throw KeeperException(res.error, batch[i]);
|
||||
}
|
||||
}
|
||||
return removed_as_expected;
|
||||
}
|
||||
|
||||
void ZooKeeper::removeRecursive(const std::string & path)
|
||||
|
@ -225,7 +225,10 @@ public:
|
||||
/// If keep_child_node is not empty, this method will not remove path/keep_child_node (but will remove its subtree).
|
||||
/// It can be useful to keep some child node as a flag which indicates that path is currently removing.
|
||||
void removeChildrenRecursive(const std::string & path, const String & keep_child_node = {});
|
||||
void tryRemoveChildrenRecursive(const std::string & path, const String & keep_child_node = {});
|
||||
/// If probably_flat is true, this method will optimistically try to remove children non-recursive
|
||||
/// and will fall back to recursive removal if it gets ZNOTEMPTY for some child.
|
||||
/// Returns true if no kind of fallback happened.
|
||||
bool tryRemoveChildrenRecursive(const std::string & path, bool probably_flat = false, const String & keep_child_node = {});
|
||||
|
||||
/// Remove all children nodes (non recursive).
|
||||
void removeChildren(const std::string & path);
|
||||
|
@ -674,7 +674,6 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
|
||||
LOG_INFO(log, "Marked recovered {} as finished", entry_name);
|
||||
}
|
||||
}
|
||||
current_zookeeper->set(replica_path + "/log_ptr", toString(max_log_ptr));
|
||||
}
|
||||
|
||||
std::map<String, String> DatabaseReplicated::tryGetConsistentMetadataSnapshot(const ZooKeeperPtr & zookeeper, UInt32 & max_log_ptr)
|
||||
|
@ -66,9 +66,17 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
|
||||
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/max_log_ptr"));
|
||||
logs_to_keep = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/logs_to_keep"));
|
||||
if (our_log_ptr == 0 || our_log_ptr + logs_to_keep < max_log_ptr)
|
||||
{
|
||||
database->recoverLostReplica(zookeeper, our_log_ptr, max_log_ptr);
|
||||
zookeeper->set(database->replica_path + "/log_ptr", toString(max_log_ptr));
|
||||
initializeLogPointer(DDLTaskBase::getLogEntryName(max_log_ptr));
|
||||
}
|
||||
else
|
||||
last_skipped_entry_name.emplace(DDLTaskBase::getLogEntryName(our_log_ptr));
|
||||
{
|
||||
String log_entry_name = DDLTaskBase::getLogEntryName(our_log_ptr);
|
||||
last_skipped_entry_name.emplace(log_entry_name);
|
||||
initializeLogPointer(log_entry_name);
|
||||
}
|
||||
}
|
||||
|
||||
String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry)
|
||||
@ -140,10 +148,10 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr
|
||||
/// but it requires more complex logic around /try node.
|
||||
|
||||
auto zookeeper = getAndSetZooKeeper();
|
||||
UInt32 our_log_ptr = parse<UInt32>(zookeeper->get(database->replica_path + "/log_ptr"));
|
||||
UInt32 our_log_ptr = getLogPointer();
|
||||
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/max_log_ptr"));
|
||||
assert(our_log_ptr <= max_log_ptr);
|
||||
if (database->db_settings.max_replication_lag_to_enqueue < max_log_ptr - our_log_ptr)
|
||||
|
||||
if (our_log_ptr + database->db_settings.max_replication_lag_to_enqueue < max_log_ptr)
|
||||
throw Exception(ErrorCodes::NOT_A_LEADER, "Cannot enqueue query on this replica, "
|
||||
"because it has replication lag of {} queries. Try other replica.", max_log_ptr - our_log_ptr);
|
||||
|
||||
@ -203,7 +211,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
|
||||
}
|
||||
}
|
||||
|
||||
UInt32 our_log_ptr = parse<UInt32>(zookeeper->get(fs::path(database->replica_path) / "log_ptr"));
|
||||
UInt32 our_log_ptr = getLogPointer();
|
||||
UInt32 entry_num = DatabaseReplicatedTask::getLogEntryNumber(entry_name);
|
||||
|
||||
if (entry_num <= our_log_ptr)
|
||||
@ -308,4 +316,18 @@ bool DatabaseReplicatedDDLWorker::canRemoveQueueEntry(const String & entry_name,
|
||||
return entry_number + logs_to_keep < max_log_ptr;
|
||||
}
|
||||
|
||||
void DatabaseReplicatedDDLWorker::initializeLogPointer(const String & processed_entry_name)
|
||||
{
|
||||
updateMaxDDLEntryID(processed_entry_name);
|
||||
assert(max_id.load() == parse<UInt32>(getAndSetZooKeeper()->get(fs::path(database->replica_path) / "log_ptr")));
|
||||
}
|
||||
|
||||
UInt32 DatabaseReplicatedDDLWorker::getLogPointer() const
|
||||
{
|
||||
/// NOTE it may not be equal to the log_ptr in zk:
|
||||
/// - max_id can be equal to log_ptr - 1 due to race condition (when it's updated in zk, but not updated in memory yet)
|
||||
/// - max_id can be greater than log_ptr, because log_ptr is not updated for failed and dummy entries
|
||||
return max_id.load();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -32,9 +32,11 @@ public:
|
||||
static String enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry,
|
||||
DatabaseReplicated * const database, bool committed = false); /// NOLINT
|
||||
|
||||
UInt32 getLogPointer() const;
|
||||
private:
|
||||
bool initializeMainThread() override;
|
||||
void initializeReplication();
|
||||
void initializeLogPointer(const String & processed_entry_name);
|
||||
|
||||
DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) override;
|
||||
bool canRemoveQueueEntry(const String & entry_name, const Coordination::Stat & stat) override;
|
||||
|
@ -632,8 +632,6 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
|
||||
task.was_executed = true;
|
||||
}
|
||||
|
||||
updateMaxDDLEntryID(task.entry_name);
|
||||
|
||||
/// Step 3: Create node in finished/ status dir and write execution status.
|
||||
/// FIXME: if server fails right here, the task will be executed twice. We need WAL here.
|
||||
/// NOTE: If ZooKeeper connection is lost here, we will try again to write query status.
|
||||
@ -650,6 +648,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
|
||||
active_node->setAlreadyRemoved();
|
||||
|
||||
task.completely_processed = true;
|
||||
updateMaxDDLEntryID(task.entry_name);
|
||||
}
|
||||
|
||||
|
||||
@ -876,7 +875,7 @@ void DDLWorker::cleanupQueue(Int64, const ZooKeeperPtr & zookeeper)
|
||||
|
||||
/// We recursively delete all nodes except node_path/finished to prevent staled hosts from
|
||||
/// creating node_path/active node (see createStatusDirs(...))
|
||||
zookeeper->tryRemoveChildrenRecursive(node_path, "finished");
|
||||
zookeeper->tryRemoveChildrenRecursive(node_path, /* probably_flat */ false, "finished");
|
||||
|
||||
/// And then we remove node_path and node_path/finished in a single transaction
|
||||
Coordination::Requests ops;
|
||||
|
@ -319,19 +319,22 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
}
|
||||
|
||||
bool skip_sanity_checks = false;
|
||||
|
||||
if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data"))
|
||||
/// It does not make sense for CREATE query
|
||||
if (attach)
|
||||
{
|
||||
skip_sanity_checks = true;
|
||||
current_zookeeper->remove(replica_path + "/flags/force_restore_data");
|
||||
if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data"))
|
||||
{
|
||||
skip_sanity_checks = true;
|
||||
current_zookeeper->remove(replica_path + "/flags/force_restore_data");
|
||||
|
||||
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag {}/flags/force_restore_data).", replica_path);
|
||||
}
|
||||
else if (has_force_restore_data_flag)
|
||||
{
|
||||
skip_sanity_checks = true;
|
||||
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag {}/flags/force_restore_data).", replica_path);
|
||||
}
|
||||
else if (has_force_restore_data_flag)
|
||||
{
|
||||
skip_sanity_checks = true;
|
||||
|
||||
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag force_restore_data).");
|
||||
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag force_restore_data).");
|
||||
}
|
||||
}
|
||||
|
||||
loadDataParts(skip_sanity_checks);
|
||||
@ -569,35 +572,31 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
|
||||
{
|
||||
auto zookeeper = getZooKeeper();
|
||||
|
||||
/// Working with quorum.
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/quorum", String());
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/quorum/parallel", String());
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", String());
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", String());
|
||||
|
||||
/// Tracking lag of replicas.
|
||||
zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", String());
|
||||
zookeeper->createIfNotExists(replica_path + "/max_processed_insert_time", String());
|
||||
|
||||
/// Mutations
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/mutations", String());
|
||||
zookeeper->createIfNotExists(replica_path + "/mutation_pointer", String());
|
||||
std::vector<zkutil::ZooKeeper::FutureCreate> futures;
|
||||
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/quorum/parallel", String(), zkutil::CreateMode::Persistent));
|
||||
|
||||
/// Nodes for remote fs zero-copy replication
|
||||
const auto settings = getSettings();
|
||||
if (settings->allow_remote_fs_zero_copy_replication)
|
||||
{
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3", String());
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3/shared", String());
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_hdfs", String());
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_hdfs/shared", String());
|
||||
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/zero_copy_s3", String(), zkutil::CreateMode::Persistent));
|
||||
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/zero_copy_s3/shared", String(), zkutil::CreateMode::Persistent));
|
||||
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/zero_copy_hdfs", String(), zkutil::CreateMode::Persistent));
|
||||
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/zero_copy_hdfs/shared", String(), zkutil::CreateMode::Persistent));
|
||||
}
|
||||
|
||||
/// Part movement.
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/part_moves_shard", String());
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/pinned_part_uuids", getPinnedPartUUIDs()->toString());
|
||||
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/part_moves_shard", String(), zkutil::CreateMode::Persistent));
|
||||
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/pinned_part_uuids", getPinnedPartUUIDs()->toString(), zkutil::CreateMode::Persistent));
|
||||
/// For ALTER PARTITION with multi-leaders
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/alter_partition_version", String());
|
||||
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/alter_partition_version", String(), zkutil::CreateMode::Persistent));
|
||||
|
||||
for (auto & future : futures)
|
||||
{
|
||||
auto res = future.get();
|
||||
if (res.error != Coordination::Error::ZOK && res.error != Coordination::Error::ZNODEEXISTS)
|
||||
throw Coordination::Exception(fmt::format("Failed to create new nodes at {}", zookeeper_path), res.error);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -672,6 +671,16 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name,
|
||||
zkutil::CreateMode::Persistent));
|
||||
|
||||
/// The following 4 nodes were added in version 1.1.xxx, so we create them here, not in createNewZooKeeperNodes()
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/quorum", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/quorum/last_part", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/quorum/failed_parts", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/mutations", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
|
||||
/// And create first replica atomically. See also "createReplica" method that is used to create not the first replicas.
|
||||
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path, "",
|
||||
@ -695,6 +704,14 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version),
|
||||
zkutil::CreateMode::Persistent));
|
||||
|
||||
/// The following 3 nodes were added in version 1.1.xxx, so we create them here, not in createNewZooKeeperNodes()
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/min_unprocessed_insert_time", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/max_processed_insert_time", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/mutation_pointer", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
|
||||
Coordination::Responses responses;
|
||||
auto code = zookeeper->tryMulti(ops, responses);
|
||||
if (code == Coordination::Error::ZNODEEXISTS)
|
||||
@ -760,6 +777,14 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version),
|
||||
zkutil::CreateMode::Persistent));
|
||||
|
||||
/// The following 3 nodes were added in version 1.1.xxx, so we create them here, not in createNewZooKeeperNodes()
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/min_unprocessed_insert_time", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/max_processed_insert_time", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/mutation_pointer", "",
|
||||
zkutil::CreateMode::Persistent));
|
||||
|
||||
/// Check version of /replicas to see if there are any replicas created at the same moment of time.
|
||||
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, replicas_stat.version));
|
||||
|
||||
@ -803,13 +828,14 @@ void StorageReplicatedMergeTree::drop()
|
||||
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
|
||||
|
||||
shutdown();
|
||||
dropReplica(zookeeper, zookeeper_path, replica_name, log);
|
||||
dropReplica(zookeeper, zookeeper_path, replica_name, log, getSettings());
|
||||
}
|
||||
|
||||
dropAllData();
|
||||
}
|
||||
|
||||
void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica, Poco::Logger * logger)
|
||||
void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
|
||||
Poco::Logger * logger, MergeTreeSettingsPtr table_settings)
|
||||
{
|
||||
if (zookeeper->expired())
|
||||
throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
|
||||
@ -841,18 +867,43 @@ void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con
|
||||
/// "The local set of parts of table X doesn't look like the set of parts in ZooKeeper"
|
||||
///
|
||||
{
|
||||
Strings children = zookeeper->getChildren(remote_replica_path);
|
||||
/// Remove metadata first
|
||||
[[maybe_unused]] auto code = zookeeper->tryRemove(fs::path(remote_replica_path) / "metadata");
|
||||
assert(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE);
|
||||
|
||||
if (std::find(children.begin(), children.end(), "metadata") != children.end())
|
||||
zookeeper->remove(fs::path(remote_replica_path) / "metadata");
|
||||
|
||||
for (const auto & child : children)
|
||||
/// Then try to remove paths that are known to be flat (all children are leafs)
|
||||
Strings flat_nodes = {"flags", "queue"};
|
||||
if (table_settings && table_settings->use_minimalistic_part_header_in_zookeeper)
|
||||
flat_nodes.emplace_back("parts");
|
||||
for (const auto & node : flat_nodes)
|
||||
{
|
||||
if (child != "metadata")
|
||||
zookeeper->removeRecursive(fs::path(remote_replica_path) / child);
|
||||
bool removed_quickly = zookeeper->tryRemoveChildrenRecursive(fs::path(remote_replica_path) / node, /* probably flat */ true);
|
||||
if (!removed_quickly)
|
||||
LOG_WARNING(logger, "Failed to quickly remove node '{}' and its children, fell back to recursive removal (replica: {})",
|
||||
node, remote_replica_path);
|
||||
}
|
||||
|
||||
zookeeper->remove(remote_replica_path);
|
||||
/// Then try to remove nodes that are known to have no children (and should always exist)
|
||||
Coordination::Requests ops;
|
||||
for (const auto & node : flat_nodes)
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/" + node, -1));
|
||||
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/columns", -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/host", -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/is_lost", -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/log_pointer", -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/max_processed_insert_time", -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/min_unprocessed_insert_time", -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/metadata_version", -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/mutation_pointer", -1));
|
||||
Coordination::Responses res;
|
||||
code = zookeeper->tryMulti(ops, res);
|
||||
if (code != Coordination::Error::ZOK)
|
||||
LOG_WARNING(logger, "Cannot quickly remove nodes without children: {} (replica: {}). Will remove recursively.",
|
||||
Coordination::errorMessage(code), remote_replica_path);
|
||||
|
||||
/// And finally remove everything else recursively
|
||||
zookeeper->tryRemoveRecursive(remote_replica_path);
|
||||
}
|
||||
|
||||
/// It may left some garbage if replica_path subtree are concurrently modified
|
||||
@ -912,17 +963,47 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper
|
||||
const String & zookeeper_path, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger)
|
||||
{
|
||||
bool completely_removed = false;
|
||||
|
||||
/// NOTE /block_numbers/ actually is not flat, because /block_numbers/<partition_id>/ may have ephemeral children,
|
||||
/// but we assume that all ephemeral block locks are already removed when table is being dropped.
|
||||
static constexpr std::array flat_nodes = {"block_numbers", "blocks", "leader_election", "log", "mutations", "pinned_part_uuids"};
|
||||
|
||||
/// First try to remove paths that are known to be flat
|
||||
for (const auto * node : flat_nodes)
|
||||
{
|
||||
bool removed_quickly = zookeeper->tryRemoveChildrenRecursive(fs::path(zookeeper_path) / node, /* probably flat */ true);
|
||||
if (!removed_quickly)
|
||||
LOG_WARNING(logger, "Failed to quickly remove node '{}' and its children, fell back to recursive removal (table: {})",
|
||||
node, zookeeper_path);
|
||||
}
|
||||
|
||||
/// Then try to remove nodes that are known to have no children (and should always exist)
|
||||
Coordination::Requests ops;
|
||||
for (const auto * node : flat_nodes)
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/" + node, -1));
|
||||
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/alter_partition_version", -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/columns", -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/metadata", -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/table_shared_id", -1));
|
||||
Coordination::Responses res;
|
||||
auto code = zookeeper->tryMulti(ops, res);
|
||||
if (code != Coordination::Error::ZOK)
|
||||
LOG_WARNING(logger, "Cannot quickly remove nodes without children: {} (table: {}). Will remove recursively.",
|
||||
Coordination::errorMessage(code), zookeeper_path);
|
||||
|
||||
Strings children;
|
||||
Coordination::Error code = zookeeper->tryGetChildren(zookeeper_path, children);
|
||||
code = zookeeper->tryGetChildren(zookeeper_path, children);
|
||||
if (code == Coordination::Error::ZNONODE)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is a race condition between creation and removal of replicated table. It's a bug");
|
||||
|
||||
|
||||
for (const auto & child : children)
|
||||
{
|
||||
if (child != "dropped")
|
||||
zookeeper->tryRemoveRecursive(fs::path(zookeeper_path) / child);
|
||||
}
|
||||
|
||||
Coordination::Requests ops;
|
||||
ops.clear();
|
||||
Coordination::Responses responses;
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(metadata_drop_lock->getPath(), -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(fs::path(zookeeper_path) / "dropped", -1));
|
||||
@ -4328,7 +4409,7 @@ std::optional<UInt64> StorageReplicatedMergeTree::totalBytes(const Settings & se
|
||||
void StorageReplicatedMergeTree::assertNotReadonly() const
|
||||
{
|
||||
if (is_readonly)
|
||||
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode (zookeeper path: {})", zookeeper_path);
|
||||
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode (replica path: {})", replica_path);
|
||||
}
|
||||
|
||||
|
||||
|
@ -214,7 +214,8 @@ public:
|
||||
|
||||
/** Remove a specific replica from zookeeper.
|
||||
*/
|
||||
static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica, Poco::Logger * logger);
|
||||
static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
|
||||
Poco::Logger * logger, MergeTreeSettingsPtr table_settings = nullptr);
|
||||
|
||||
/// Removes table from ZooKeeper after the last replica was dropped
|
||||
static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,
|
||||
|
Loading…
Reference in New Issue
Block a user