mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
CLICKHOUSE-3847 fix
This commit is contained in:
parent
9271ea7a5f
commit
ce217af78a
@ -95,11 +95,13 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
||||
: 0];
|
||||
|
||||
std::unordered_map<String, UInt32> host_versions_inactive_replicas;
|
||||
std::unordered_map<String, String> log_pointers_inactive_replicas;
|
||||
std::unordered_map<String, String> log_pointers_lost_replicas;
|
||||
|
||||
size_t replicas_were_marked_is_lost = 0;
|
||||
|
||||
for (const String & replica : replicas)
|
||||
{
|
||||
zkutil::Stat host_stat
|
||||
zkutil::Stat host_stat;
|
||||
zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/host", &host_stat);
|
||||
String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
|
||||
if (pointer.empty())
|
||||
@ -125,10 +127,12 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
||||
else
|
||||
{
|
||||
host_versions_inactive_replicas[replica] = host_stat.version;
|
||||
log_pointers_inactive_replicas[replica] = log_pointer_str;
|
||||
log_pointers_lost_replicas[replica] = log_pointer_str;
|
||||
}
|
||||
}
|
||||
/// Only to support old versions CH.
|
||||
else
|
||||
++replicas_were_marked_is_lost;
|
||||
host_versions_inactive_replicas[replica] = host_stat.version;
|
||||
}
|
||||
}
|
||||
@ -141,19 +145,19 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
||||
if (entries.empty())
|
||||
return;
|
||||
|
||||
markLostReplicas(host_versions_inactive_replicas, log_pointers_inactive_replicas, replicas.size(), entries.back(), zookeeper);
|
||||
markLostReplicas(host_versions_inactive_replicas, log_pointers_lost_replicas, replicas.size() - replicas_were_marked_is_lost, zookeeper);
|
||||
|
||||
zkutil::Requests ops;
|
||||
for (size_t i = 0; i < entries.size(); ++i)
|
||||
{
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/log/" + entries[i], -1));
|
||||
|
||||
/// we need to check this because the replica that was restored from one of the marked replicas does not copy a non-valid log_pointer.
|
||||
for (auto host_version: host_versions_inactive_replicas)
|
||||
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas" + host_version.first + "/host", host_version.second));
|
||||
|
||||
if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size())
|
||||
{
|
||||
/// we need to check this because the replica that was restored from one of the marked replicas does not copy a non-valid log_pointer.
|
||||
for (auto host_version: host_versions_inactive_replicas)
|
||||
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas" + host_version.first + "/host", host_version.second));
|
||||
|
||||
/// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list.
|
||||
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", stat.version));
|
||||
zookeeper->multi(ops);
|
||||
@ -166,38 +170,41 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
|
||||
|
||||
|
||||
void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map<String, UInt32> & host_versions_inactive_replicas,
|
||||
const std::unordered_map<String, String> & log_pointers_inactive_replicas,
|
||||
size_t replicas_count, const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper)
|
||||
const std::unordered_map<String, String> & log_pointers_lost_replicas,
|
||||
size_t replicas_count, const zkutil::ZooKeeperPtr & zookeeper)
|
||||
{
|
||||
std::vector<zkutil::Requests> requests;
|
||||
std::vector<zkutil::ZooKeeper::FutureMulti> futures;
|
||||
struct LostReplicaInfo {
|
||||
String name;
|
||||
zkutil::Requests requests;
|
||||
};
|
||||
|
||||
for (auto pair : log_pointers_inactive_replicas)
|
||||
std::vector<zkutil::Requests> requests;
|
||||
std::vector<LostReplicaInfo> lost_replicas_info;
|
||||
std::vector<std::pair<LostReplicaInfo, zkutil::ZooKeeper::FutureMulti>> info_and_future;
|
||||
|
||||
for (auto pair : log_pointers_lost_replicas)
|
||||
{
|
||||
String replica = pair.first;
|
||||
if (pair.second <= remove_border)
|
||||
{
|
||||
zkutil::Requests ops;
|
||||
/// If host changed version we can not mark replicas, because replica started to be active.
|
||||
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_inactive_replicas.at(replica)));
|
||||
ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1));
|
||||
requests.push_back(ops);
|
||||
}
|
||||
zkutil::Requests ops;
|
||||
/// If host changed version we can not mark replicas, because replica started to be active.
|
||||
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_inactive_replicas.at(replica)));
|
||||
ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1));
|
||||
lost_replicas_info.push_back(LostReplicaInfo{replica, ops});
|
||||
}
|
||||
|
||||
if (requests.size() == replicas_count)
|
||||
if (lost_replicas_info.size() == replicas_count)
|
||||
throw Exception("All replicas wiil be lost", ErrorCodes::ALL_REPLICAS_LOST);
|
||||
|
||||
for (auto & req : requests)
|
||||
futures.push_back(zookeeper->tryAsyncMulti(req));
|
||||
for (auto & replica_info : lost_replicas_info)
|
||||
info_and_future.emplace_back(replica_info, zookeeper->tryAsyncMulti(replica_info.requests));
|
||||
|
||||
for (size_t i = 0; i < futures.size(); ++i)
|
||||
for (auto & pair : info_and_future)
|
||||
{
|
||||
auto multi_responses = futures[i].get();
|
||||
auto multi_responses = pair.second.get();
|
||||
if (multi_responses.responses[0]->error == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
|
||||
throw Exception("One of the replicas became active, when we clear log", DB::ErrorCodes::REPLICA_STATUS_CHANGED);
|
||||
throw Exception(pair.first.name + " became active, when we clear log", DB::ErrorCodes::REPLICA_STATUS_CHANGED);
|
||||
else if (multi_responses.error != ZooKeeperImpl::ZooKeeper::ZOK)
|
||||
zkutil::KeeperMultiException::check(multi_responses.error, requests[i], multi_responses.responses);
|
||||
zkutil::KeeperMultiException::check(multi_responses.error, pair.first.requests, multi_responses.responses);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -48,8 +48,8 @@ private:
|
||||
|
||||
/// Mark lost replicas.
|
||||
void markLostReplicas(const std::unordered_map<String, UInt32> & host_versions_inactive_replicas,
|
||||
const std::unordered_map<String, String> & log_pointers_inactive_replicas,
|
||||
size_t replicas_count, const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper);
|
||||
const std::unordered_map<String, String> & log_pointers_lost_replicas,
|
||||
size_t replicas_count, const zkutil::ZooKeeperPtr & zookeeper);
|
||||
|
||||
/// Remove old block hashes from ZooKeeper. This is done by the leader replica.
|
||||
void clearOldBlocks();
|
||||
|
@ -26,8 +26,6 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int REPLICA_IS_ALREADY_ACTIVE;
|
||||
extern const int ALL_REPLICAS_LOST;
|
||||
extern const int CAN_NOT_CLONE_REPLICA;
|
||||
}
|
||||
|
||||
namespace
|
||||
|
@ -617,7 +617,9 @@ void StorageReplicatedMergeTree::createReplica()
|
||||
code = zookeeper->tryMulti(ops, resps);
|
||||
if (code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
|
||||
throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
|
||||
else if (code != ZooKeeperImpl::ZooKeeper::ZBADVERSION)
|
||||
else if (code == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
|
||||
LOG_ERROR(log, "Retry createReplica(), because some replicas were created");
|
||||
else
|
||||
zkutil::KeeperMultiException::check(code, ops, resps);
|
||||
} while (code == ZooKeeperImpl::ZooKeeper::ZBADVERSION);
|
||||
}
|
||||
@ -1982,6 +1984,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
|
||||
zkutil::Requests ops;
|
||||
ops.push_back(zkutil::makeSetRequest(replica_path + "/log_pointer", raw_log_pointer, -1));
|
||||
|
||||
/// For support old versions CH.
|
||||
if (source_is_lost_stat.version == -1)
|
||||
{
|
||||
ops.push_back(zkutil::makeCreateRequest(replica_path + "/is_lost", "0", zkutil::CreateMode::PersistentSequential));
|
||||
@ -1994,9 +1997,9 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
|
||||
|
||||
auto error = zookeeper->tryMulti(ops, resp);
|
||||
if (error == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
|
||||
throw Exception("Can not clone replica, because a source replica is lost", ErrorCodes::REPLICA_STATUS_CHANGED);
|
||||
throw Exception("Can not clone replica, because a " + source_path + " became lost", ErrorCodes::REPLICA_STATUS_CHANGED);
|
||||
else if (error == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
|
||||
throw Exception("ClickHouse server updated to new version", ErrorCodes::REPLICA_STATUS_CHANGED);
|
||||
throw Exception("Can not clone replica, because the clickHouse server updated to new version", ErrorCodes::REPLICA_STATUS_CHANGED);
|
||||
else
|
||||
zkutil::KeeperMultiException::check(error, ops, resp);
|
||||
|
||||
@ -2069,6 +2072,7 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke
|
||||
String resp;
|
||||
if (!zookeeper->tryGet(source_replica_path + "/is_lost", resp, &source_is_lost_stat) || resp == "0")
|
||||
source_replica = replica_name;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -401,8 +401,7 @@ private:
|
||||
void mutationsUpdatingTask();
|
||||
|
||||
/** Clone data from another replica.
|
||||
* return true, if replica wil be cloned
|
||||
* else return false (when source_replica was lost or log was empty).
|
||||
* If replica can not be cloned throw Exception.
|
||||
*/
|
||||
void cloneReplica(const String & source_replica, zkutil::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user