Don't hold shared_ptr to zookeeper for long time, because it makes very difficult to remove ephemeral nodes in case of temporary communication errors and thus breaks leader election [#METR-23272].

This commit is contained in:
Alexey Milovidov 2016-10-24 15:34:08 +03:00
parent 6e49241120
commit 2dd43be6e6
3 changed files with 102 additions and 107 deletions

View File

@ -157,7 +157,7 @@ public:
* Если в процессе обработки было исключение - сохраняет его в entry. * Если в процессе обработки было исключение - сохраняет его в entry.
* Возвращает true, если в процессе обработки не было исключений. * Возвращает true, если в процессе обработки не было исключений.
*/ */
bool processEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func); bool processEntry(std::function<zkutil::ZooKeeperPtr()> get_zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func);
/// Будет ли кусок в будущем слит в более крупный (или мерджи кусков в данном диапазоне запрещены)? /// Будет ли кусок в будущем слит в более крупный (или мерджи кусков в данном диапазоне запрещены)?
bool partWillBeMergedOrMergesDisabled(const String & part_name) const; bool partWillBeMergedOrMergesDisabled(const String & part_name) const;

View File

@ -577,14 +577,17 @@ ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToP
} }
bool ReplicatedMergeTreeQueue::processEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func) bool ReplicatedMergeTreeQueue::processEntry(
std::function<zkutil::ZooKeeperPtr()> get_zookeeper,
LogEntryPtr & entry,
const std::function<bool(LogEntryPtr &)> func)
{ {
std::exception_ptr saved_exception; std::exception_ptr saved_exception;
try try
{ {
if (func(entry)) if (func(entry))
remove(zookeeper, entry); remove(get_zookeeper(), entry);
} }
catch (...) catch (...)
{ {

View File

@ -1037,8 +1037,6 @@ void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_ev
bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context) bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context)
{ {
auto zookeeper = getZooKeeper();
if (entry.type == LogEntry::DROP_RANGE) if (entry.type == LogEntry::DROP_RANGE)
{ {
executeDropRange(entry); executeDropRange(entry);
@ -1053,7 +1051,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
MergeTreeData::DataPartPtr containing_part = data.getActiveContainingPart(entry.new_part_name); MergeTreeData::DataPartPtr containing_part = data.getActiveContainingPart(entry.new_part_name);
/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper. Проверим, что он там есть. /// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper. Проверим, что он там есть.
if (containing_part && zookeeper->exists(replica_path + "/parts/" + containing_part->name)) if (containing_part && getZooKeeper()->exists(replica_path + "/parts/" + containing_part->name))
{ {
if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)) if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " - part already exists."); LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " - part already exists.");
@ -1065,7 +1063,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
LOG_WARNING(log, "Part " << entry.new_part_name << " from own log doesn't exist."); LOG_WARNING(log, "Part " << entry.new_part_name << " from own log doesn't exist.");
/// Возможно, этот кусок нам не нужен, так как при записи с кворумом, кворум пофейлился (см. ниже про /quorum/failed_parts). /// Возможно, этот кусок нам не нужен, так как при записи с кворумом, кворум пофейлился (см. ниже про /quorum/failed_parts).
if (entry.quorum && zookeeper->exists(zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name)) if (entry.quorum && getZooKeeper()->exists(zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name))
{ {
LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " because quorum for that part was failed."); LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " because quorum for that part was failed.");
return true; /// NOTE Удаление из virtual_parts не делается, но оно нужно только для мерджей. return true; /// NOTE Удаление из virtual_parts не делается, но оно нужно только для мерджей.
@ -1278,6 +1276,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
* Это позволит проследить, что реплики не стали активными. * Это позволит проследить, что реплики не стали активными.
*/ */
auto zookeeper = getZooKeeper();
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
zkutil::Ops ops; zkutil::Ops ops;
@ -1416,11 +1416,9 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTree::LogEntry & entry) void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTree::LogEntry & entry)
{ {
auto zookeeper = getZooKeeper();
LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << "."); LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << ".");
queue.removeGetsAndMergesInRange(zookeeper, entry.new_part_name); queue.removeGetsAndMergesInRange(getZooKeeper(), entry.new_part_name);
LOG_DEBUG(log, (entry.detach ? "Detaching" : "Removing") << " parts."); LOG_DEBUG(log, (entry.detach ? "Detaching" : "Removing") << " parts.");
size_t removed_parts = 0; size_t removed_parts = 0;
@ -1445,7 +1443,7 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
zkutil::Ops ops; zkutil::Ops ops;
removePartFromZooKeeper(part->name, ops); removePartFromZooKeeper(part->name, ops);
auto code = zookeeper->tryMulti(ops); auto code = getZooKeeper()->tryMulti(ops);
/// Если кусок уже удалён (например, потому что он так и не был добавлен в ZK из-за сбоя, /// Если кусок уже удалён (например, потому что он так и не был добавлен в ZK из-за сбоя,
/// см. ReplicatedMergeTreeBlockOutputStream), то всё Ок. /// см. ReplicatedMergeTreeBlockOutputStream), то всё Ок.
@ -1463,8 +1461,6 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
bool StorageReplicatedMergeTree::executeAttachPart(const StorageReplicatedMergeTree::LogEntry & entry) bool StorageReplicatedMergeTree::executeAttachPart(const StorageReplicatedMergeTree::LogEntry & entry)
{ {
auto zookeeper = getZooKeeper();
String source_path = (entry.attach_unreplicated ? "unreplicated/" : "detached/") + entry.source_part_name; String source_path = (entry.attach_unreplicated ? "unreplicated/" : "detached/") + entry.source_part_name;
LOG_INFO(log, "Attaching part " << entry.source_part_name << " from " << source_path << " as " << entry.new_part_name); LOG_INFO(log, "Attaching part " << entry.source_part_name << " from " << source_path << " as " << entry.new_part_name);
@ -1490,7 +1486,7 @@ bool StorageReplicatedMergeTree::executeAttachPart(const StorageReplicatedMergeT
LOG_WARNING(log, "Unreplicated part " << entry.source_part_name << " is already detached"); LOG_WARNING(log, "Unreplicated part " << entry.source_part_name << " is already detached");
} }
zookeeper->multi(ops); getZooKeeper()->multi(ops);
/// NOTE: Не можем использовать renameTempPartAndAdd, потому что кусок не временный - если что-то пойдет не так, его не нужно удалять. /// NOTE: Не можем использовать renameTempPartAndAdd, потому что кусок не временный - если что-то пойдет не так, его не нужно удалять.
part->renameTo(entry.new_part_name); part->renameTo(entry.new_part_name);
@ -1559,7 +1555,7 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p
time_t prev_attempt_time = entry->last_attempt_time; time_t prev_attempt_time = entry->last_attempt_time;
bool res = queue.processEntry(getZooKeeper(), entry, [&](LogEntryPtr & entry) bool res = queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry)
{ {
try try
{ {
@ -2054,15 +2050,13 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
currently_fetching_parts.erase(part_name); currently_fetching_parts.erase(part_name);
); );
auto zookeeper = getZooKeeper();
LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_path); LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_path);
TableStructureReadLockPtr table_lock; TableStructureReadLockPtr table_lock;
if (!to_detached) if (!to_detached)
table_lock = lockStructure(true); table_lock = lockStructure(true);
ReplicatedMergeTreeAddress address(zookeeper->get(replica_path + "/host")); ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart( MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(
part_name, replica_path, address.host, address.replication_port, to_detached); part_name, replica_path, address.host, address.replication_port, to_detached);
@ -2081,7 +2075,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
MergeTreeData::Transaction transaction; MergeTreeData::Transaction transaction;
auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction); auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
zookeeper->multi(ops); getZooKeeper()->multi(ops);
transaction.commit(); transaction.commit();
/** Если для этого куска отслеживается кворум, то надо его обновить. /** Если для этого куска отслеживается кворум, то надо его обновить.
@ -2342,8 +2336,6 @@ bool StorageReplicatedMergeTree::optimize(const String & partition, bool final,
assertNotReadonly(); assertNotReadonly();
auto zookeeper = getZooKeeper();
if (!is_leader_node) if (!is_leader_node)
throw Exception("Method OPTIMIZE for ReplicatedMergeTree could be called only on leader replica", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Method OPTIMIZE for ReplicatedMergeTree could be called only on leader replica", ErrorCodes::NOT_IMPLEMENTED);
@ -2394,7 +2386,6 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
{ {
assertNotReadonly(); assertNotReadonly();
auto zookeeper = getZooKeeper();
auto merge_blocker = merger.cancel(); auto merge_blocker = merger.cancel();
auto unreplicated_merge_blocker = unreplicated_merger ? auto unreplicated_merge_blocker = unreplicated_merger ?
unreplicated_merger->cancel() : MergeTreeDataMerger::Blocker(); unreplicated_merger->cancel() : MergeTreeDataMerger::Blocker();
@ -2433,7 +2424,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
}.toString(); }.toString();
/// Делаем ALTER. /// Делаем ALTER.
zookeeper->set(zookeeper_path + "/columns", new_columns_str, -1, &stat); getZooKeeper()->set(zookeeper_path + "/columns", new_columns_str, -1, &stat);
new_columns_version = stat.version; new_columns_version = stat.version;
} }
@ -2443,7 +2434,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
/// Ждем, пока все реплики обновят данные. /// Ждем, пока все реплики обновят данные.
/// Подпишемся на изменения столбцов, чтобы перестать ждать, если кто-то еще сделает ALTER. /// Подпишемся на изменения столбцов, чтобы перестать ждать, если кто-то еще сделает ALTER.
if (!zookeeper->exists(zookeeper_path + "/columns", &stat, alter_query_event)) if (!getZooKeeper()->exists(zookeeper_path + "/columns", &stat, alter_query_event))
throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE); throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (stat.version != new_columns_version) if (stat.version != new_columns_version)
@ -2453,7 +2444,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
return; return;
} }
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
std::set<String> inactive_replicas; std::set<String> inactive_replicas;
std::set<String> timed_out_replicas; std::set<String> timed_out_replicas;
@ -2467,7 +2458,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
while (!shutdown_called) while (!shutdown_called)
{ {
/// Реплика может быть неактивной. /// Реплика может быть неактивной.
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) if (!getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
{ {
LOG_WARNING(log, "Replica " << replica << " is not active during ALTER query." LOG_WARNING(log, "Replica " << replica << " is not active during ALTER query."
" ALTER will be done asynchronously when replica becomes active."); " ALTER will be done asynchronously when replica becomes active.");
@ -2479,7 +2470,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
String replica_columns_str; String replica_columns_str;
/// Реплику могли успеть удалить. /// Реплику могли успеть удалить.
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/columns", replica_columns_str, &stat)) if (!getZooKeeper()->tryGet(zookeeper_path + "/replicas/" + replica + "/columns", replica_columns_str, &stat))
{ {
LOG_WARNING(log, replica << " was removed"); LOG_WARNING(log, replica << " was removed");
break; break;
@ -2490,7 +2481,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
if (replica_columns_str == new_columns_str) if (replica_columns_str == new_columns_str)
break; break;
if (!zookeeper->exists(zookeeper_path + "/columns", &stat)) if (!getZooKeeper()->exists(zookeeper_path + "/columns", &stat))
throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE); throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (stat.version != new_columns_version) if (stat.version != new_columns_version)
@ -2500,7 +2491,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
return; return;
} }
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat, alter_query_event)) if (!getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat, alter_query_event))
{ {
LOG_WARNING(log, replica << " was removed"); LOG_WARNING(log, replica << " was removed");
break; break;
@ -2625,24 +2616,23 @@ void StorageReplicatedMergeTree::dropPartition(
assertNotReadonly(); assertNotReadonly();
auto zookeeper = getZooKeeper();
String month_name = MergeTreeData::getMonthName(field); String month_name = MergeTreeData::getMonthName(field);
if (!is_leader_node) if (!is_leader_node)
{ {
/// Проксируем запрос в лидера. /// Проксируем запрос в лидера.
auto live_replicas = zookeeper->getChildren(zookeeper_path + "/leader_election"); auto live_replicas = getZooKeeper()->getChildren(zookeeper_path + "/leader_election");
if (live_replicas.empty()) if (live_replicas.empty())
throw Exception("No active replicas", ErrorCodes::NO_ACTIVE_REPLICAS); throw Exception("No active replicas", ErrorCodes::NO_ACTIVE_REPLICAS);
std::sort(live_replicas.begin(), live_replicas.end()); std::sort(live_replicas.begin(), live_replicas.end());
const auto leader = zookeeper->get(zookeeper_path + "/leader_election/" + live_replicas.front()); const auto leader = getZooKeeper()->get(zookeeper_path + "/leader_election/" + live_replicas.front());
if (leader == replica_name) if (leader == replica_name)
throw Exception("Leader was suddenly changed or logical error.", ErrorCodes::LEADERSHIP_CHANGED); throw Exception("Leader was suddenly changed or logical error.", ErrorCodes::LEADERSHIP_CHANGED);
ReplicatedMergeTreeAddress leader_address(zookeeper->get(zookeeper_path + "/replicas/" + leader + "/host")); ReplicatedMergeTreeAddress leader_address(getZooKeeper()->get(zookeeper_path + "/replicas/" + leader + "/host"));
auto new_query = query->clone(); auto new_query = query->clone();
auto & alter = typeid_cast<ASTAlterQuery &>(*new_query); auto & alter = typeid_cast<ASTAlterQuery &>(*new_query);
@ -2708,7 +2698,7 @@ void StorageReplicatedMergeTree::dropPartition(
entry.source_replica = replica_name; entry.source_replica = replica_name;
entry.new_part_name = fake_part_name; entry.new_part_name = fake_part_name;
entry.detach = detach; entry.detach = detach;
String log_znode_path = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential); String log_znode_path = getZooKeeper()->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
entry.create_time = time(0); entry.create_time = time(0);
@ -2727,7 +2717,6 @@ void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & fie
{ {
assertNotReadonly(); assertNotReadonly();
auto zookeeper = getZooKeeper();
String partition; String partition;
if (attach_part) if (attach_part)
@ -2812,12 +2801,12 @@ void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & fie
entry.create_time = time(0); entry.create_time = time(0);
ops.push_back(new zkutil::Op::Create( ops.push_back(new zkutil::Op::Create(
zookeeper_path + "/log/log-", entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential)); zookeeper_path + "/log/log-", entry.toString(), getZooKeeper()->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
} }
LOG_DEBUG(log, "Adding attaches to log"); LOG_DEBUG(log, "Adding attaches to log");
zookeeper->multi(ops); getZooKeeper()->multi(ops);
/// Если надо - дожидаемся выполнения операции на себе или на всех репликах. /// Если надо - дожидаемся выполнения операции на себе или на всех репликах.
if (settings.replication_alter_partitions_sync != 0) if (settings.replication_alter_partitions_sync != 0)
@ -2841,26 +2830,28 @@ void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & fie
void StorageReplicatedMergeTree::drop() void StorageReplicatedMergeTree::drop()
{ {
auto zookeeper = tryGetZooKeeper();
if (is_readonly || !zookeeper)
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
shutdown();
if (zookeeper->expired())
throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
LOG_INFO(log, "Removing replica " << replica_path);
replica_is_active_node = nullptr;
zookeeper->tryRemoveRecursive(replica_path);
/// Проверяем, что zookeeper_path существует: его могла удалить другая реплика после выполнения предыдущей строки.
Strings replicas;
if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == ZOK && replicas.empty())
{ {
LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)"); auto zookeeper = tryGetZooKeeper();
zookeeper->tryRemoveRecursive(zookeeper_path);
if (is_readonly || !zookeeper)
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
shutdown();
if (zookeeper->expired())
throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
LOG_INFO(log, "Removing replica " << replica_path);
replica_is_active_node = nullptr;
zookeeper->tryRemoveRecursive(replica_path);
/// Проверяем, что zookeeper_path существует: его могла удалить другая реплика после выполнения предыдущей строки.
Strings replicas;
if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == ZOK && replicas.empty())
{
LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)");
zookeeper->tryRemoveRecursive(zookeeper_path);
}
} }
data.dropAllData(); data.dropAllData();
@ -2891,8 +2882,7 @@ bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path)
return true; return true;
} }
auto zookeeper = getZooKeeper(); bool res = getZooKeeper()->exists(path);
bool res = zookeeper->exists(path);
if (res) if (res)
{ {
@ -2933,10 +2923,9 @@ AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const
void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry) void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry)
{ {
auto zookeeper = getZooKeeper();
LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name); LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name);
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas) for (const String & replica : replicas)
waitForReplicaToProcessLogEntry(replica, entry); waitForReplicaToProcessLogEntry(replica, entry);
@ -2946,8 +2935,6 @@ void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Repli
void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry) void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry)
{ {
auto zookeeper = getZooKeeper();
String entry_str = entry.toString(); String entry_str = entry.toString();
String log_node_name; String log_node_name;
@ -2982,7 +2969,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
{ {
zkutil::EventPtr event = std::make_shared<Poco::Event>(); zkutil::EventPtr event = std::make_shared<Poco::Event>();
String log_pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event); String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index) if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
break; break;
@ -2995,9 +2982,9 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
* ища ноду с таким же содержимым. И если мы её не найдём - значит реплика уже взяла эту запись в свою queue. * ища ноду с таким же содержимым. И если мы её не найдём - значит реплика уже взяла эту запись в свою queue.
*/ */
String log_pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer"); String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer");
Strings log_entries = zookeeper->getChildren(zookeeper_path + "/log"); Strings log_entries = getZooKeeper()->getChildren(zookeeper_path + "/log");
UInt64 log_index = 0; UInt64 log_index = 0;
bool found = false; bool found = false;
@ -3009,7 +2996,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
continue; continue;
String log_entry_str; String log_entry_str;
bool exists = zookeeper->tryGet(zookeeper_path + "/log/" + log_entry_name, log_entry_str); bool exists = getZooKeeper()->tryGet(zookeeper_path + "/log/" + log_entry_name, log_entry_str);
if (exists && entry_str == log_entry_str) if (exists && entry_str == log_entry_str)
{ {
found = true; found = true;
@ -3027,7 +3014,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
{ {
zkutil::EventPtr event = std::make_shared<Poco::Event>(); zkutil::EventPtr event = std::make_shared<Poco::Event>();
String log_pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event); String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index) if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
break; break;
@ -3048,13 +3035,13 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
* Поэтому, ищем путём сравнения содержимого. * Поэтому, ищем путём сравнения содержимого.
*/ */
Strings queue_entries = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/queue"); Strings queue_entries = getZooKeeper()->getChildren(zookeeper_path + "/replicas/" + replica + "/queue");
String queue_entry_to_wait_for; String queue_entry_to_wait_for;
for (const String & entry_name : queue_entries) for (const String & entry_name : queue_entries)
{ {
String queue_entry_str; String queue_entry_str;
bool exists = zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/queue/" + entry_name, queue_entry_str); bool exists = getZooKeeper()->tryGet(zookeeper_path + "/replicas/" + replica + "/queue/" + entry_name, queue_entry_str);
if (exists && queue_entry_str == entry_str) if (exists && queue_entry_str == entry_str)
{ {
queue_entry_to_wait_for = entry_name; queue_entry_to_wait_for = entry_name;
@ -3072,7 +3059,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue"); LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue");
/// Третье - дождемся, пока запись исчезнет из очереди реплики. /// Третье - дождемся, пока запись исчезнет из очереди реплики.
zookeeper->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for); getZooKeeper()->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for);
} }
@ -3209,8 +3196,6 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t
void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const String & from_, const Settings & settings) void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const String & from_, const Settings & settings)
{ {
auto zookeeper = getZooKeeper();
String partition_str = MergeTreeData::getMonthName(partition); String partition_str = MergeTreeData::getMonthName(partition);
String from = from_; String from = from_;
@ -3227,47 +3212,53 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S
if (startsWith(dir_it.name(), partition_str)) if (startsWith(dir_it.name(), partition_str))
throw Exception("Detached partition " + partition_str + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS); throw Exception("Detached partition " + partition_str + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
/// Список реплик шарда-источника. zkutil::Strings replicas;
zkutil::Strings replicas = zookeeper->getChildren(from + "/replicas");
/// Оставим только активные реплики.
zkutil::Strings active_replicas; zkutil::Strings active_replicas;
active_replicas.reserve(replicas.size());
for (const String & replica : replicas)
if (zookeeper->exists(from + "/replicas/" + replica + "/is_active"))
active_replicas.push_back(replica);
if (active_replicas.empty())
throw Exception("No active replicas for shard " + from, ErrorCodes::NO_ACTIVE_REPLICAS);
/** Надо выбрать лучшую (наиболее актуальную) реплику.
* Это реплика с максимальным log_pointer, затем с минимальным размером queue.
* NOTE Это не совсем лучший критерий. Для скачивания старых партиций это не имеет смысла,
* и было бы неплохо уметь выбирать реплику, ближайшую по сети.
* NOTE Разумеется, здесь есть data race-ы. Можно решить ретраями.
*/
Int64 max_log_pointer = -1;
UInt64 min_queue_size = std::numeric_limits<UInt64>::max();
String best_replica; String best_replica;
for (const String & replica : active_replicas)
{ {
String current_replica_path = from + "/replicas/" + replica; auto zookeeper = getZooKeeper();
String log_pointer_str = zookeeper->get(current_replica_path + "/log_pointer"); /// Список реплик шарда-источника.
Int64 log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str); replicas = zookeeper->getChildren(from + "/replicas");
zkutil::Stat stat; /// Оставим только активные реплики.
zookeeper->get(current_replica_path + "/queue", &stat); active_replicas.reserve(replicas.size());
size_t queue_size = stat.numChildren;
if (log_pointer > max_log_pointer for (const String & replica : replicas)
|| (log_pointer == max_log_pointer && queue_size < min_queue_size)) if (zookeeper->exists(from + "/replicas/" + replica + "/is_active"))
active_replicas.push_back(replica);
if (active_replicas.empty())
throw Exception("No active replicas for shard " + from, ErrorCodes::NO_ACTIVE_REPLICAS);
/** Надо выбрать лучшую (наиболее актуальную) реплику.
* Это реплика с максимальным log_pointer, затем с минимальным размером queue.
* NOTE Это не совсем лучший критерий. Для скачивания старых партиций это не имеет смысла,
* и было бы неплохо уметь выбирать реплику, ближайшую по сети.
* NOTE Разумеется, здесь есть data race-ы. Можно решить ретраями.
*/
Int64 max_log_pointer = -1;
UInt64 min_queue_size = std::numeric_limits<UInt64>::max();
for (const String & replica : active_replicas)
{ {
max_log_pointer = log_pointer; String current_replica_path = from + "/replicas/" + replica;
min_queue_size = queue_size;
best_replica = replica; String log_pointer_str = zookeeper->get(current_replica_path + "/log_pointer");
Int64 log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
zkutil::Stat stat;
zookeeper->get(current_replica_path + "/queue", &stat);
size_t queue_size = stat.numChildren;
if (log_pointer > max_log_pointer
|| (log_pointer == max_log_pointer && queue_size < min_queue_size))
{
max_log_pointer = log_pointer;
min_queue_size = queue_size;
best_replica = replica;
}
} }
} }
@ -3296,7 +3287,7 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S
if (try_no >= 5) if (try_no >= 5)
throw Exception("Too much retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MUCH_RETRIES_TO_FETCH_PARTS); throw Exception("Too much retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MUCH_RETRIES_TO_FETCH_PARTS);
Strings parts = zookeeper->getChildren(best_replica_path + "/parts"); Strings parts = getZooKeeper()->getChildren(best_replica_path + "/parts");
ActiveDataPartSet active_parts_set(parts); ActiveDataPartSet active_parts_set(parts);
Strings parts_to_fetch; Strings parts_to_fetch;
@ -3683,9 +3674,10 @@ StorageReplicatedMergeTree::gatherReplicaSpaceInfo(const WeightedZooKeeperPaths
local_space_info.factor = 1.1; local_space_info.factor = 1.1;
local_space_info.available_size = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); local_space_info.available_size = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
auto zookeeper = getZooKeeper();
for (const auto & weighted_path : weighted_zookeeper_paths) for (const auto & weighted_path : weighted_zookeeper_paths)
{ {
auto zookeeper = getZooKeeper();
const auto & path = weighted_path.first; const auto & path = weighted_path.first;
UInt64 weight = weighted_path.second; UInt64 weight = weighted_path.second;