mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
better
This commit is contained in:
parent
c9aedee24f
commit
be55e1d2e1
@ -15,6 +15,7 @@ namespace ErrorCodes
|
|||||||
{
|
{
|
||||||
extern const int SUPPORT_IS_DISABLED;
|
extern const int SUPPORT_IS_DISABLED;
|
||||||
extern const int REPLICA_STATUS_CHANGED;
|
extern const int REPLICA_STATUS_CHANGED;
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicatedMergeTree & storage_)
|
ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicatedMergeTree & storage_)
|
||||||
@ -117,6 +118,66 @@ void ReplicatedMergeTreeAttachThread::checkHasReplicaMetadataInZooKeeper(const z
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper)
|
||||||
|
{
|
||||||
|
const String & zookeeper_path = storage.zookeeper_path;
|
||||||
|
const String & replica_path = storage.replica_path;
|
||||||
|
|
||||||
|
for (size_t i = 0; i != 2; ++i)
|
||||||
|
{
|
||||||
|
String replica_metadata_version_str;
|
||||||
|
const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version_str);
|
||||||
|
if (!replica_metadata_version_exists)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
const Int32 metadata_version = parse<Int32>(replica_metadata_version_str);
|
||||||
|
|
||||||
|
if (metadata_version != 0)
|
||||||
|
{
|
||||||
|
/// No need to fix anything
|
||||||
|
return metadata_version;
|
||||||
|
}
|
||||||
|
|
||||||
|
Coordination::Stat stat;
|
||||||
|
zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat);
|
||||||
|
if (stat.version == 0)
|
||||||
|
{
|
||||||
|
/// No need to fix anything
|
||||||
|
return metadata_version;
|
||||||
|
}
|
||||||
|
|
||||||
|
ReplicatedMergeTreeQueue & queue = storage.queue;
|
||||||
|
queue.pullLogsToQueue(zookeeper);
|
||||||
|
if (queue.getStatus().metadata_alters_in_queue != 0)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue");
|
||||||
|
return metadata_version;
|
||||||
|
}
|
||||||
|
|
||||||
|
const Coordination::Requests ops = {
|
||||||
|
zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0),
|
||||||
|
zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version),
|
||||||
|
};
|
||||||
|
Coordination::Responses ops_responses;
|
||||||
|
const auto code = zookeeper->tryMulti(ops, ops_responses);
|
||||||
|
if (code == Coordination::Error::ZOK)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "Successfully set metadata_version to {}", stat.version);
|
||||||
|
return stat.version;
|
||||||
|
}
|
||||||
|
if (code != Coordination::Error::ZBADVERSION)
|
||||||
|
{
|
||||||
|
throw zkutil::KeeperException(code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Second attempt is only possible if metadata_version != 0 or metadata.version changed during the first attempt.
|
||||||
|
/// If metadata_version != 0, on second attempt we will return the new metadata_version.
|
||||||
|
/// If metadata.version changed, on second attempt we will either get metadata_version != 0 and return the new metadata_version or we will get metadata_alters_in_queue != 0 and return 0.
|
||||||
|
/// Either way, on second attempt this method should return.
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to fix replica metadata_version in ZooKeeper after two attempts");
|
||||||
|
}
|
||||||
|
|
||||||
void ReplicatedMergeTreeAttachThread::runImpl()
|
void ReplicatedMergeTreeAttachThread::runImpl()
|
||||||
{
|
{
|
||||||
storage.setZooKeeper();
|
storage.setZooKeeper();
|
||||||
@ -160,11 +221,11 @@ void ReplicatedMergeTreeAttachThread::runImpl()
|
|||||||
/// Just in case it was not removed earlier due to connection loss
|
/// Just in case it was not removed earlier due to connection loss
|
||||||
zookeeper->tryRemove(replica_path + "/flags/force_restore_data");
|
zookeeper->tryRemove(replica_path + "/flags/force_restore_data");
|
||||||
|
|
||||||
String replica_metadata_version;
|
const Int32 replica_metadata_version = fixReplicaMetadataVersionIfNeeded(zookeeper);
|
||||||
const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version);
|
const bool replica_metadata_version_exists = replica_metadata_version != -1;
|
||||||
if (replica_metadata_version_exists)
|
if (replica_metadata_version_exists)
|
||||||
{
|
{
|
||||||
storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(parse<int>(replica_metadata_version)));
|
storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(replica_metadata_version));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -48,6 +48,8 @@ private:
|
|||||||
void runImpl();
|
void runImpl();
|
||||||
|
|
||||||
void finalizeInitialization();
|
void finalizeInitialization();
|
||||||
|
|
||||||
|
Int32 fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper);
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -6158,8 +6158,6 @@ void StorageReplicatedMergeTree::alter(
|
|||||||
mutation_znode.reset();
|
mutation_znode.reset();
|
||||||
|
|
||||||
auto current_metadata = getInMemoryMetadataPtr();
|
auto current_metadata = getInMemoryMetadataPtr();
|
||||||
// update metadata's metadata_version
|
|
||||||
// fixReplicaMetadataVersionIfNeeded(current_metadata->metadata_version);
|
|
||||||
|
|
||||||
StorageInMemoryMetadata future_metadata = *current_metadata;
|
StorageInMemoryMetadata future_metadata = *current_metadata;
|
||||||
commands.apply(future_metadata, query_context);
|
commands.apply(future_metadata, query_context);
|
||||||
@ -6205,8 +6203,7 @@ void StorageReplicatedMergeTree::alter(
|
|||||||
size_t mutation_path_idx = std::numeric_limits<size_t>::max();
|
size_t mutation_path_idx = std::numeric_limits<size_t>::max();
|
||||||
|
|
||||||
String new_metadata_str = future_metadata_in_zk.toString();
|
String new_metadata_str = future_metadata_in_zk.toString();
|
||||||
Int32 metadata_version = fixMetadataVersionInZooKeeper();
|
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, current_metadata->getMetadataVersion()));
|
||||||
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, metadata_version));
|
|
||||||
|
|
||||||
String new_columns_str = future_metadata.columns.toString();
|
String new_columns_str = future_metadata.columns.toString();
|
||||||
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "columns", new_columns_str, -1));
|
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "columns", new_columns_str, -1));
|
||||||
@ -10690,49 +10687,4 @@ template std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::all
|
|||||||
const std::vector<String> & zookeeper_block_id_path,
|
const std::vector<String> & zookeeper_block_id_path,
|
||||||
const String & zookeeper_path_prefix) const;
|
const String & zookeeper_path_prefix) const;
|
||||||
|
|
||||||
Int32 StorageReplicatedMergeTree::tryFixMetadataVersionInZooKeeper()
|
|
||||||
{
|
|
||||||
const Int32 metadata_version = getInMemoryMetadataPtr()->getMetadataVersion();
|
|
||||||
if (metadata_version != 0)
|
|
||||||
{
|
|
||||||
/// No need to fix anything
|
|
||||||
return metadata_version;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto zookeeper = getZooKeeper();
|
|
||||||
|
|
||||||
Coordination::Stat stat;
|
|
||||||
zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat);
|
|
||||||
if (stat.version == 0)
|
|
||||||
{
|
|
||||||
/// No need to fix anything
|
|
||||||
return metadata_version;
|
|
||||||
}
|
|
||||||
|
|
||||||
queue.pullLogsToQueue(zookeeper);
|
|
||||||
if (queue.getStatus().metadata_alters_in_queue != 0)
|
|
||||||
{
|
|
||||||
LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue");
|
|
||||||
return metadata_version;
|
|
||||||
}
|
|
||||||
|
|
||||||
const Coordination::Requests ops = {
|
|
||||||
zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0),
|
|
||||||
zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version),
|
|
||||||
};
|
|
||||||
Coordination::Responses ops_responses;
|
|
||||||
const auto code = current_zookeeper->tryMulti(ops, ops_responses);
|
|
||||||
if (code == Coordination::Error::ZOK)
|
|
||||||
{
|
|
||||||
LOG_DEBUG(log, "Successfully fixed metadata_version");
|
|
||||||
return stat.version;
|
|
||||||
}
|
|
||||||
if (code == Coordination::Error::ZBADVERSION)
|
|
||||||
{
|
|
||||||
LOG_DEBUG(log, "No need to update metadata_version because table metadata has been updated on a different replica");
|
|
||||||
return metadata_version;
|
|
||||||
}
|
|
||||||
throw zkutil::KeeperException(code);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1026,8 +1026,6 @@ private:
|
|||||||
const bool & zero_copy_enabled,
|
const bool & zero_copy_enabled,
|
||||||
const bool & always_use_copy_instead_of_hardlinks,
|
const bool & always_use_copy_instead_of_hardlinks,
|
||||||
const ContextPtr & query_context);
|
const ContextPtr & query_context);
|
||||||
|
|
||||||
Int32 tryFixMetadataVersionInZooKeeper();
|
|
||||||
};
|
};
|
||||||
|
|
||||||
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);
|
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);
|
||||||
|
Loading…
Reference in New Issue
Block a user