From 09076b30d5a4e22bfab0dfb09441e26f34389e1b Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 12 Dec 2014 23:50:32 +0300 Subject: [PATCH] Merge --- .../ReplicatedMergeTreeBlockOutputStream.h | 24 +++---- .../DB/Storages/StorageReplicatedMergeTree.h | 16 ++++- .../ReplicatedMergeTreeCleanupThread.cpp | 25 +++++--- .../ReplicatedMergeTreeRestartingThread.cpp | 21 +++--- .../Storages/StorageReplicatedMergeTree.cpp | 64 ++++++++++++++++--- 5 files changed, 111 insertions(+), 39 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index 5694ef60e60..286e8a32773 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -22,12 +22,14 @@ public: void write(const Block & block) override { - assertSessionIsNotExpired(); + auto zookeeper = storage.getZooKeeper(); + + assertSessionIsNotExpired(zookeeper); auto part_blocks = storage.writer.splitBlockIntoParts(block); for (auto & current_block : part_blocks) { - assertSessionIsNotExpired(); + assertSessionIsNotExpired(zookeeper); ++block_index; String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index); @@ -60,29 +62,29 @@ public: ops.push_back(new zkutil::Op::Create( storage.zookeeper_path + "/blocks/" + block_id, "", - storage.zookeeper->getDefaultACL(), + zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create( storage.zookeeper_path + "/blocks/" + block_id + "/columns", part->columns.toString(), - storage.zookeeper->getDefaultACL(), + zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create( storage.zookeeper_path + "/blocks/" + block_id + "/checksums", part->checksums.toString(), - storage.zookeeper->getDefaultACL(), + zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create( storage.zookeeper_path + "/blocks/" + block_id + "/number", toString(part_number), - storage.zookeeper->getDefaultACL(), + zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent)); } storage.checkPartAndAddToZooKeeper(part, ops, part_name); ops.push_back(new zkutil::Op::Create( storage.zookeeper_path + "/log/log-", log_entry.toString(), - storage.zookeeper->getDefaultACL(), + zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential)); block_number_lock.getUnlockOps(ops); @@ -91,7 +93,7 @@ public: try { - auto code = storage.zookeeper->tryMulti(ops); + auto code = zookeeper->tryMulti(ops); if (code == ZOK) { transaction.commit(); @@ -101,7 +103,7 @@ public: { /// Если блок с таким ID уже есть в таблице, откатим его вставку. String expected_checksums_str; - if (!block_id.empty() && storage.zookeeper->tryGet( + if (!block_id.empty() && zookeeper->tryGet( storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str)) { LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")"); @@ -149,9 +151,9 @@ private: /// Позволяет проверить, что сессия в ZooKeeper ещё жива. - void assertSessionIsNotExpired() + void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper) { - if (storage.zookeeper->expired()) + if (zookeeper->expired()) throw Exception("ZooKeeper session has been expired.", ErrorCodes::NO_ZOOKEEPER); } }; diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index f4eb94321b0..a4c55d355b5 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -146,7 +146,21 @@ private: typedef std::list StringList; Context & context; - zkutil::ZooKeeperPtr zookeeper; + + zkutil::ZooKeeperPtr current_zookeeper; /// Используйте только с помощью методов ниже. + std::mutex current_zookeeper_mutex; /// Для пересоздания сессии в фоновом потоке. + + zkutil::ZooKeeperPtr getZooKeeper() + { + std::lock_guard lock(current_zookeeper_mutex); + return current_zookeeper; + } + + void setZooKeeper(zkutil::ZooKeeperPtr zookeeper) + { + std::lock_guard lock(current_zookeeper_mutex); + current_zookeeper = zookeeper; + } /// Если true, таблица в офлайновом режиме, и в нее нельзя писать. bool is_readonly = false; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 6076fce92b2..7e73743aa23 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -51,6 +51,7 @@ void ReplicatedMergeTreeCleanupThread::iterate() void ReplicatedMergeTreeCleanupThread::clearOldParts() { auto table_lock = storage.lockStructure(false); + auto zookeeper = storage.getZooKeeper(); MergeTreeData::DataPartsVector parts = storage.data.grabOldParts(); size_t count = parts.size(); @@ -73,7 +74,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldParts() ops.push_back(new zkutil::Op::Remove(storage.replica_path + "/parts/" + part->name + "/columns", -1)); ops.push_back(new zkutil::Op::Remove(storage.replica_path + "/parts/" + part->name + "/checksums", -1)); ops.push_back(new zkutil::Op::Remove(storage.replica_path + "/parts/" + part->name, -1)); - auto code = storage.zookeeper->tryMulti(ops); + auto code = zookeeper->tryMulti(ops); if (code != ZOK) LOG_WARNING(log, "Couldn't remove " << part->name << " from ZooKeeper: " << zkutil::ZooKeeper::error2string(code)); @@ -94,8 +95,10 @@ void ReplicatedMergeTreeCleanupThread::clearOldParts() void ReplicatedMergeTreeCleanupThread::clearOldLogs() { + auto zookeeper = storage.getZooKeeper(); + zkutil::Stat stat; - if (!storage.zookeeper->exists(storage.zookeeper_path + "/log", &stat)) + if (!zookeeper->exists(storage.zookeeper_path + "/log", &stat)) throw Exception(storage.zookeeper_path + "/log doesn't exist", ErrorCodes::NOT_FOUND_NODE); int children_count = stat.numChildren; @@ -104,17 +107,17 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() if (static_cast(children_count) < storage.data.settings.replicated_logs_to_keep * 1.1) return; - Strings replicas = storage.zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat); + Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat); UInt64 min_pointer = std::numeric_limits::max(); for (const String & replica : replicas) { - String pointer = storage.zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer"); + String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer"); if (pointer.empty()) return; min_pointer = std::min(min_pointer, parse(pointer)); } - Strings entries = storage.zookeeper->getChildren(storage.zookeeper_path + "/log"); + Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log"); std::sort(entries.begin(), entries.end()); /// Не будем трогать последние replicated_logs_to_keep записей. @@ -134,7 +137,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() { /// Одновременно с очисткой лога проверим, не добавилась ли реплика с тех пор, как мы получили список реплик. ops.push_back(new zkutil::Op::Check(storage.zookeeper_path + "/replicas", stat.version)); - storage.zookeeper->multi(ops); + zookeeper->multi(ops); ops.clear(); } } @@ -145,8 +148,10 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() void ReplicatedMergeTreeCleanupThread::clearOldBlocks() { + auto zookeeper = storage.getZooKeeper(); + zkutil::Stat stat; - if (!storage.zookeeper->exists(storage.zookeeper_path + "/blocks", &stat)) + if (!zookeeper->exists(storage.zookeeper_path + "/blocks", &stat)) throw Exception(storage.zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE); int children_count = stat.numChildren; @@ -158,14 +163,14 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() LOG_TRACE(log, "Clearing about " << static_cast(children_count) - storage.data.settings.replicated_deduplication_window << " old blocks from ZooKeeper. This might take several minutes."); - Strings blocks = storage.zookeeper->getChildren(storage.zookeeper_path + "/blocks"); + Strings blocks = zookeeper->getChildren(storage.zookeeper_path + "/blocks"); std::vector > timed_blocks; for (const String & block : blocks) { zkutil::Stat stat; - storage.zookeeper->exists(storage.zookeeper_path + "/blocks/" + block, &stat); + zookeeper->exists(storage.zookeeper_path + "/blocks/" + block, &stat); timed_blocks.push_back(std::make_pair(stat.czxid, block)); } @@ -180,7 +185,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() if (ops.size() > 400 || i + 1 == timed_blocks.size()) { - storage.zookeeper->multi(ops); + zookeeper->multi(ops); ops.clear(); } } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index b817eca19f0..d4429d1b070 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -38,7 +38,7 @@ void ReplicatedMergeTreeRestartingThread::run() /// Запуск реплики при старте сервера/создании таблицы. Перезапуск реплики при истечении сессии с ZK. while (!need_stop) { - if (first_time || storage.zookeeper->expired()) + if (first_time || storage.getZooKeeper()->expired()) { if (first_time) { @@ -56,8 +56,7 @@ void ReplicatedMergeTreeRestartingThread::run() { try { - /// TODO race condition при присваивании? - storage.zookeeper = storage.context.getZooKeeper(); + storage.setZooKeeper(storage.context.getZooKeeper()); } catch (const zkutil::KeeperException & e) { @@ -112,7 +111,8 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() storage.leader_election = new zkutil::LeaderElection( storage.zookeeper_path + "/leader_election", - *storage.zookeeper, + *storage.current_zookeeper, /// current_zookeeper живёт в течение времени жизни leader_election, + /// так как до изменения current_zookeeper, объект leader_election уничтожается в методе partialShutdown. [this] { storage.becomeLeader(); }, storage.replica_name); @@ -166,6 +166,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() void ReplicatedMergeTreeRestartingThread::activateReplica() { auto host_port = storage.context.getInterserverIOAddress(); + auto zookeeper = storage.getZooKeeper(); std::string address; { @@ -181,18 +182,18 @@ void ReplicatedMergeTreeRestartingThread::activateReplica() * но он крайне маловероятен при нормальном использовании. */ String data; - if (storage.zookeeper->tryGet(storage.replica_path + "/is_active", data) && data == active_node_identifier) - storage.zookeeper->tryRemove(storage.replica_path + "/is_active"); + if (zookeeper->tryGet(storage.replica_path + "/is_active", data) && data == active_node_identifier) + zookeeper->tryRemove(storage.replica_path + "/is_active"); /// Одновременно объявим, что эта реплика активна, и обновим хост. zkutil::Ops ops; ops.push_back(new zkutil::Op::Create(storage.replica_path + "/is_active", - active_node_identifier, storage.zookeeper->getDefaultACL(), zkutil::CreateMode::Ephemeral)); + active_node_identifier, zookeeper->getDefaultACL(), zkutil::CreateMode::Ephemeral)); ops.push_back(new zkutil::Op::SetData(storage.replica_path + "/host", address, -1)); try { - storage.zookeeper->multi(ops); + zookeeper->multi(ops); } catch (const zkutil::KeeperException & e) { @@ -203,7 +204,9 @@ void ReplicatedMergeTreeRestartingThread::activateReplica() throw; } - storage.replica_is_active_node = zkutil::EphemeralNodeHolder::existing(storage.replica_path + "/is_active", *storage.zookeeper); + /// current_zookeeper живёт в течение времени жизни replica_is_active_node, + /// так как до изменения current_zookeeper, объект replica_is_active_node уничтожается в методе partialShutdown. + storage.replica_is_active_node = zkutil::EphemeralNodeHolder::existing(storage.replica_path + "/is_active", *storage.current_zookeeper); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 6259419ee52..b26a3b39354 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -39,7 +39,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( const Names & columns_to_sum_, const MergeTreeSettings & settings_) : IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_), - zookeeper(context.getZooKeeper()), database_name(database_name_), + current_zookeeper(context.getZooKeeper()), database_name(database_name_), table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(context.getMacros().expand(zookeeper_path_)), replica_name(context.getMacros().expand(replica_name_)), @@ -61,10 +61,10 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( try { - if (zookeeper && zookeeper->exists(replica_path + "/flags/force_restore_data")) + if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data")) { skip_sanity_checks = true; - zookeeper->remove(replica_path + "/flags/force_restore_data"); + current_zookeeper->remove(replica_path + "/flags/force_restore_data"); LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag " << replica_path << "/flags/force_restore_data)."); @@ -76,7 +76,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( if (e.code == ZCONNECTIONLOSS) { tryLogCurrentException(__PRETTY_FUNCTION__); - zookeeper = nullptr; + current_zookeeper = nullptr; } else throw; @@ -84,7 +84,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( data.loadDataParts(skip_sanity_checks); - if (!zookeeper) + if (!current_zookeeper) { if (!attach) throw Exception("Can't create replicated table without ZooKeeper", ErrorCodes::NO_ZOOKEEPER); @@ -157,11 +157,11 @@ StoragePtr StorageReplicatedMergeTree::create( columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, index_granularity_, mode_, - sign_column_, columns_to_sum_, settings_ - }; + sign_column_, columns_to_sum_, settings_}; + StoragePtr res_ptr = res->thisPtr(); - if (res->zookeeper) + if (res->getZooKeeper()) { String endpoint_name = "ReplicatedMergeTree:" + res->replica_path; InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, *res); @@ -184,6 +184,8 @@ static String formattedAST(const ASTPtr & ast) void StorageReplicatedMergeTree::createTableIfNotExists() { + auto zookeeper = getZooKeeper(); + if (zookeeper->exists(zookeeper_path)) return; @@ -236,6 +238,8 @@ void StorageReplicatedMergeTree::createTableIfNotExists() */ void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bool allow_alter) { + auto zookeeper = getZooKeeper(); + String metadata_str = zookeeper->get(zookeeper_path + "/metadata"); ReadBufferFromString buf(metadata_str); assertString("metadata format version: 1", buf); @@ -297,6 +301,8 @@ void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bo void StorageReplicatedMergeTree::createReplica() { + auto zookeeper = getZooKeeper(); + LOG_DEBUG(log, "Creating replica " << replica_path); /// Создадим пустую реплику. Ноду columns создадим в конце - будем использовать ее в качестве признака, что создание реплики завершено. @@ -431,6 +437,8 @@ void StorageReplicatedMergeTree::createReplica() void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) { + auto zookeeper = getZooKeeper(); + Strings expected_parts_vec = zookeeper->getChildren(replica_path + "/parts"); /// Куски в ZK. @@ -574,6 +582,8 @@ void StorageReplicatedMergeTree::initVirtualParts() void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(const MergeTreeData::DataPartPtr & part, zkutil::Ops & ops, String part_name) { + auto zookeeper = getZooKeeper(); + if (part_name.empty()) part_name = part->name; @@ -641,6 +651,8 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(const MergeTreeData: void StorageReplicatedMergeTree::loadQueue() { + auto zookeeper = getZooKeeper(); + std::lock_guard lock(queue_mutex); Strings children = zookeeper->getChildren(replica_path + "/queue"); @@ -660,6 +672,8 @@ void StorageReplicatedMergeTree::loadQueue() void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_event) { + auto zookeeper = getZooKeeper(); + std::lock_guard lock(queue_mutex); String index_str = zookeeper->get(replica_path + "/log_pointer"); @@ -754,6 +768,8 @@ bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry) bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context) { + auto zookeeper = getZooKeeper(); + if (entry.type == LogEntry::DROP_RANGE) { executeDropRange(entry); @@ -976,6 +992,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTree::LogEntry & entry) { + auto zookeeper = getZooKeeper(); + LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << "."); { @@ -1065,6 +1083,8 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr bool StorageReplicatedMergeTree::executeAttachPart(const StorageReplicatedMergeTree::LogEntry & entry) { + auto zookeeper = getZooKeeper(); + String source_path = (entry.attach_unreplicated ? "unreplicated/" : "detached/") + entry.source_part_name; LOG_INFO(log, "Attaching part " << entry.source_part_name << " from " << source_path << " as " << entry.new_part_name); @@ -1177,6 +1197,7 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p { if (executeLogEntry(*entry, pool_context)) { + auto zookeeper = getZooKeeper(); auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name); if (code != ZOK) @@ -1259,6 +1280,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread() return true; String month_name = left->name.substr(0, 6); + auto zookeeper = getZooKeeper(); /// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам. for (UInt64 number = left->right + 1; number <= right->left - 1; ++number) /// Номера блоков больше нуля. @@ -1340,6 +1362,8 @@ void StorageReplicatedMergeTree::mergeSelectingThread() do { + auto zookeeper = getZooKeeper(); + if (merges_queued >= data.settings.max_replicated_merges_in_queue) { LOG_TRACE(log, "Number of queued merges is greater than max_replicated_merges_in_queue, so won't select new parts to merge."); @@ -1445,6 +1469,8 @@ void StorageReplicatedMergeTree::alterThread() * TODO: Слишком сложно, всё переделать. */ + auto zookeeper = getZooKeeper(); + zkutil::Stat stat; const String columns_str = zookeeper->get(zookeeper_path + "/columns", &stat, alter_thread_event); auto columns_desc = ColumnsDescription::parse(columns_str, context.getDataTypeFactory()); @@ -1603,6 +1629,8 @@ void StorageReplicatedMergeTree::alterThread() void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_name) { + auto zookeeper = getZooKeeper(); + String part_path = replica_path + "/parts/" + part_name; LogEntryPtr log_entry = new LogEntry; @@ -1648,6 +1676,8 @@ void StorageReplicatedMergeTree::partCheckThread() { try { + auto zookeeper = getZooKeeper(); + /// Достанем из очереди кусок для проверки. String part_name; { @@ -1865,6 +1895,7 @@ void StorageReplicatedMergeTree::becomeLeader() String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active) { + auto zookeeper = getZooKeeper(); Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); /// Из реплик, у которых есть кусок, выберем одну равновероятно. @@ -1883,6 +1914,8 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam void StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached) { + auto zookeeper = getZooKeeper(); + LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_path); TableStructureReadLockPtr table_lock; @@ -2066,6 +2099,8 @@ bool StorageReplicatedMergeTree::optimize() void StorageReplicatedMergeTree::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) { + auto zookeeper = getZooKeeper(); + LOG_DEBUG(log, "Doing ALTER"); NamesAndTypesList new_columns; @@ -2183,6 +2218,7 @@ static String getFakePartNameForDrop(const String & month_name, UInt64 left, UIn void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach, const Settings & settings) { + auto zookeeper = getZooKeeper(); String month_name = MergeTreeData::getMonthName(field); /// TODO: Делать запрос в лидера по TCP. @@ -2241,6 +2277,7 @@ void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach, void StorageReplicatedMergeTree::attachPartition(const Field & field, bool unreplicated, bool attach_part, const Settings & settings) { + auto zookeeper = getZooKeeper(); String partition; if (attach_part) @@ -2349,6 +2386,8 @@ void StorageReplicatedMergeTree::drop() if (is_readonly) throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY); + auto zookeeper = getZooKeeper(); + shutdown(); LOG_INFO(log, "Removing replica " << replica_path); @@ -2385,6 +2424,8 @@ void StorageReplicatedMergeTree::rename(const String & new_path_to_db, const Str AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const String & month_name) { + auto zookeeper = getZooKeeper(); + String month_path = zookeeper_path + "/block_numbers/" + month_name; if (!zookeeper->exists(month_path)) { @@ -2410,6 +2451,7 @@ AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const LogEntry & entry) { + auto zookeeper = getZooKeeper(); LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name); Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); @@ -2422,6 +2464,8 @@ void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const LogEn void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const LogEntry & entry) { + auto zookeeper = getZooKeeper(); + UInt64 log_index = parse(entry.znode_name.substr(entry.znode_name.size() - 10)); String log_entry_str = entry.toString(); @@ -2479,6 +2523,8 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) { + auto zookeeper = getZooKeeper(); + res.is_leader = is_leader_node; res.is_readonly = is_readonly; res.is_session_expired = !zookeeper || zookeeper->expired(); @@ -2551,6 +2597,8 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const String & from_, const Settings & settings) { + auto zookeeper = getZooKeeper(); + String partition_str = MergeTreeData::getMonthName(partition); String from = from_;