This commit is contained in:
Alexander Tokmakov 2022-04-05 17:36:53 +02:00
parent d9e3e9b69f
commit 37a06eec1a
6 changed files with 23 additions and 23 deletions

View File

@ -674,8 +674,6 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
LOG_INFO(log, "Marked recovered {} as finished", entry_name);
}
}
current_zookeeper->set(replica_path + "/log_ptr", toString(max_log_ptr));
ddl_worker->updateLogPointer(DDLTaskBase::getLogEntryName(max_log_ptr));
}
std::map<String, String> DatabaseReplicated::tryGetConsistentMetadataSnapshot(const ZooKeeperPtr & zookeeper, UInt32 & max_log_ptr)

View File

@ -68,11 +68,14 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
if (our_log_ptr == 0 || our_log_ptr + logs_to_keep < max_log_ptr)
{
database->recoverLostReplica(zookeeper, our_log_ptr, max_log_ptr);
zookeeper->set(database->replica_path + "/log_ptr", toString(max_log_ptr));
initializeLogPointer(DDLTaskBase::getLogEntryName(max_log_ptr));
}
else
{
last_skipped_entry_name.emplace(DDLTaskBase::getLogEntryName(our_log_ptr));
updateLogPointer(DDLTaskBase::getLogEntryName(our_log_ptr));
String log_entry_name = DDLTaskBase::getLogEntryName(our_log_ptr);
last_skipped_entry_name.emplace(log_entry_name);
initializeLogPointer(log_entry_name);
}
}
@ -313,7 +316,7 @@ bool DatabaseReplicatedDDLWorker::canRemoveQueueEntry(const String & entry_name,
return entry_number + logs_to_keep < max_log_ptr;
}
void DatabaseReplicatedDDLWorker::updateLogPointer(const String & processed_entry_name)
void DatabaseReplicatedDDLWorker::initializeLogPointer(const String & processed_entry_name)
{
updateMaxDDLEntryID(processed_entry_name);
assert(max_id.load() == parse<UInt32>(getAndSetZooKeeper()->get(fs::path(database->replica_path) / "log_ptr")));
@ -321,7 +324,7 @@ void DatabaseReplicatedDDLWorker::updateLogPointer(const String & processed_entr
UInt32 DatabaseReplicatedDDLWorker::getLogPointer() const
{
/// NOTE it main not be equal to the log_ptr in zk:
/// NOTE it may not be equal to the log_ptr in zk:
/// - max_id can be equal to log_ptr - 1 due to race condition (when it's updated in zk, but not updated in memory yet)
/// - max_id can be greater than log_ptr, because log_ptr is not updated for failed and dummy entries
return max_id.load();

View File

@ -32,11 +32,11 @@ public:
static String enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry,
DatabaseReplicated * const database, bool committed = false); /// NOLINT
void updateLogPointer(const String & processed_entry_name);
UInt32 getLogPointer() const;
private:
bool initializeMainThread() override;
void initializeReplication();
void initializeLogPointer(const String & processed_entry_name);
DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) override;
bool canRemoveQueueEntry(const String & entry_name, const Coordination::Stat & stat) override;

View File

@ -632,8 +632,6 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
task.was_executed = true;
}
updateMaxDDLEntryID(task.entry_name);
/// Step 3: Create node in finished/ status dir and write execution status.
/// FIXME: if server fails right here, the task will be executed twice. We need WAL here.
/// NOTE: If ZooKeeper connection is lost here, we will try again to write query status.
@ -650,6 +648,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
active_node->setAlreadyRemoved();
task.completely_processed = true;
updateMaxDDLEntryID(task.entry_name);
}

View File

@ -828,13 +828,14 @@ void StorageReplicatedMergeTree::drop()
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
shutdown();
dropReplica(zookeeper, zookeeper_path, replica_name, log);
dropReplica(zookeeper, zookeeper_path, replica_name, log, getSettings());
}
dropAllData();
}
void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica, Poco::Logger * logger)
void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
Poco::Logger * logger, MergeTreeSettingsPtr table_settings)
{
if (zookeeper->expired())
throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
@ -871,12 +872,14 @@ void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con
assert(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE);
/// Then try to remove paths that are known to be flat (all children are leafs)
Strings flat_nodes = {"flags", "parts", "queue"};
Strings flat_nodes = {"flags", "queue"};
if (table_settings && table_settings->use_minimalistic_part_header_in_zookeeper)
flat_nodes.emplace_back("parts");
for (const auto & node : flat_nodes)
{
bool removed_quickly = zookeeper->tryRemoveChildrenRecursive(fs::path(zookeeper_path) / node, /* probably flat */ true);
bool removed_quickly = zookeeper->tryRemoveChildrenRecursive(fs::path(remote_replica_path) / node, /* probably flat */ true);
if (!removed_quickly)
LOG_WARNING(logger, "Cannot quickly remove node {} and its children (replica: {}). Will remove recursively.",
LOG_WARNING(logger, "Failed to quickly remove node '{}' and its children, fell back to recursive removal (replica: {})",
node, remote_replica_path);
}
@ -899,7 +902,6 @@ void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con
LOG_WARNING(logger, "Cannot quickly remove nodes without children: {} (replica: {}). Will remove recursively.",
Coordination::errorMessage(code), remote_replica_path);
/// And finally remove everything else recursively
zookeeper->tryRemoveRecursive(remote_replica_path);
}
@ -962,14 +964,16 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper
{
bool completely_removed = false;
Strings flat_nodes = {"block_numbers", "blocks", "leader_election", "log", "mutations", "pinned_part_uuids", "log"};
/// NOTE /block_numbers/ actually is not flat, because /block_numbers/<partition_id>/ may have ephemeral children,
/// but we assume that all ephemeral block locks are already removed when table is being dropped.
static constexpr std::array flat_nodes = {"block_numbers", "blocks", "leader_election", "log", "mutations", "pinned_part_uuids"};
/// First try to remove paths that are known to be flat
for (const auto & node : flat_nodes)
{
bool removed_quickly = zookeeper->tryRemoveChildrenRecursive(fs::path(zookeeper_path) / node, /* probably flat */ true);
if (!removed_quickly)
LOG_WARNING(logger, "Cannot quickly remove node {} and its children (table: {}). Will remove recursively.",
LOG_WARNING(logger, "Failed to quickly remove node '{}' and its children, fell back to recursive removal (table: {})",
node, zookeeper_path);
}
@ -982,11 +986,6 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/columns", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/metadata", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/table_shared_id", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/max_processed_insert_time", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/min_unprocessed_insert_time", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/metadata_version", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/mutation_pointer", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/table_shared_id", -1));
Coordination::Responses res;
auto code = zookeeper->tryMulti(ops, res);
if (code != Coordination::Error::ZOK)

View File

@ -214,7 +214,8 @@ public:
/** Remove a specific replica from zookeeper.
*/
static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica, Poco::Logger * logger);
static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
Poco::Logger * logger, MergeTreeSettingsPtr table_settings = nullptr);
/// Removes table from ZooKeeper after the last replica was dropped
static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,