fix metadata_version in keeper

This commit is contained in:
Michael Stetsyuk 2024-09-04 18:00:03 +00:00
parent 1c839ff4b9
commit 32cfdc98b2
4 changed files with 57 additions and 1 deletions

View File

@ -2128,6 +2128,7 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const
res.inserts_in_queue = 0; res.inserts_in_queue = 0;
res.merges_in_queue = 0; res.merges_in_queue = 0;
res.part_mutations_in_queue = 0; res.part_mutations_in_queue = 0;
res.metadata_alters_in_queue = 0;
res.queue_oldest_time = 0; res.queue_oldest_time = 0;
res.inserts_oldest_time = 0; res.inserts_oldest_time = 0;
res.merges_oldest_time = 0; res.merges_oldest_time = 0;
@ -2170,6 +2171,11 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const
res.oldest_part_to_mutate_to = entry->new_part_name; res.oldest_part_to_mutate_to = entry->new_part_name;
} }
} }
if (entry->type == LogEntry::ALTER_METADATA)
{
++res.metadata_alters_in_queue;
}
} }
return res; return res;

View File

@ -453,6 +453,7 @@ public:
UInt32 inserts_in_queue; UInt32 inserts_in_queue;
UInt32 merges_in_queue; UInt32 merges_in_queue;
UInt32 part_mutations_in_queue; UInt32 part_mutations_in_queue;
UInt32 metadata_alters_in_queue;
UInt32 queue_oldest_time; UInt32 queue_oldest_time;
UInt32 inserts_oldest_time; UInt32 inserts_oldest_time;
UInt32 merges_oldest_time; UInt32 merges_oldest_time;

View File

@ -6155,6 +6155,8 @@ void StorageReplicatedMergeTree::alter(
mutation_znode.reset(); mutation_znode.reset();
auto current_metadata = getInMemoryMetadataPtr(); auto current_metadata = getInMemoryMetadataPtr();
// update metadata's metadata_version
// fixReplicaMetadataVersionIfNeeded(current_metadata->metadata_version);
StorageInMemoryMetadata future_metadata = *current_metadata; StorageInMemoryMetadata future_metadata = *current_metadata;
commands.apply(future_metadata, query_context); commands.apply(future_metadata, query_context);
@ -6200,7 +6202,8 @@ void StorageReplicatedMergeTree::alter(
size_t mutation_path_idx = std::numeric_limits<size_t>::max(); size_t mutation_path_idx = std::numeric_limits<size_t>::max();
String new_metadata_str = future_metadata_in_zk.toString(); String new_metadata_str = future_metadata_in_zk.toString();
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, current_metadata->getMetadataVersion())); Int32 metadata_version = fixMetadataVersionInZooKeeper();
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, metadata_version));
String new_columns_str = future_metadata.columns.toString(); String new_columns_str = future_metadata.columns.toString();
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "columns", new_columns_str, -1)); ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "columns", new_columns_str, -1));
@ -10641,4 +10644,49 @@ template std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::all
const std::vector<String> & zookeeper_block_id_path, const std::vector<String> & zookeeper_block_id_path,
const String & zookeeper_path_prefix) const; const String & zookeeper_path_prefix) const;
Int32 StorageReplicatedMergeTree::tryFixMetadataVersionInZooKeeper()
{
const Int32 metadata_version = getInMemoryMetadataPtr()->getMetadataVersion();
if (metadata_version != 0)
{
/// No need to fix anything
return metadata_version;
}
auto zookeeper = getZooKeeper();
Coordination::Stat stat;
zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat);
if (stat.version == 0)
{
/// No need to fix anything
return metadata_version;
}
queue.pullLogsToQueue(zookeeper);
if (queue.getStatus().metadata_alters_in_queue != 0)
{
LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue");
return metadata_version;
}
const Coordination::Requests ops = {
zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0),
zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version),
};
Coordination::Responses ops_responses;
const auto code = current_zookeeper->tryMulti(ops, ops_responses);
if (code == Coordination::Error::ZOK)
{
LOG_DEBUG(log, "Successfully fixed metadata_version");
return stat.version;
}
if (code == Coordination::Error::ZBADVERSION)
{
LOG_DEBUG(log, "No need to update metadata_version because table metadata has been updated on a different replica");
return metadata_version;
}
throw zkutil::KeeperException(code);
}
} }

View File

@ -1013,6 +1013,7 @@ private:
DataPartsVector::const_iterator it; DataPartsVector::const_iterator it;
}; };
Int32 tryFixMetadataVersionInZooKeeper();
}; };
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info); String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);