diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 9d9023267f6..86bb04351b1 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1094,6 +1094,7 @@ if (ThreadFuzzer::instance().isEffective()) loadMetadataSystem(global_context); /// After attaching system databases we can initialize system log. global_context->initializeSystemLogs(); + global_context->setSystemZooKeeperLogAfterInitializationIfNeeded(); auto & database_catalog = DatabaseCatalog::instance(); /// After the system database is created, attach virtual system tables (in addition to query_log and part_log) attachSystemTablesServer(*database_catalog.getSystemDatabase(), has_zookeeper); diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index f23a16bbf3b..9b3c3191b5d 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -135,8 +135,10 @@ void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_ } ZooKeeper::ZooKeeper(const std::string & hosts_string, const std::string & identity_, int32_t session_timeout_ms_, - int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_) + int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_, + std::shared_ptr zk_log_) { + zk_log = std::move(zk_log_); Strings hosts_strings; splitInto<','>(hosts_strings, hosts_string); @@ -144,8 +146,10 @@ ZooKeeper::ZooKeeper(const std::string & hosts_string, const std::string & ident } ZooKeeper::ZooKeeper(const Strings & hosts_, const std::string & identity_, int32_t session_timeout_ms_, - int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_) + int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_, + std::shared_ptr zk_log_) { + zk_log = std::move(zk_log_); init(implementation_, hosts_, identity_, session_timeout_ms_, operation_timeout_ms_, chroot_); } @@ -729,7 +733,7 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition & ZooKeeperPtr ZooKeeper::startNewSession() const { - return std::make_shared(hosts, identity, session_timeout_ms, operation_timeout_ms, chroot, implementation); + return std::make_shared(hosts, identity, session_timeout_ms, operation_timeout_ms, chroot, implementation, zk_log); } @@ -1020,6 +1024,14 @@ void ZooKeeper::finalize() impl->finalize(); } +void ZooKeeper::setZooKeeperLog(std::shared_ptr zk_log_) +{ + zk_log = std::move(zk_log_); + if (auto * zk = dynamic_cast(impl.get())) + zk->setZooKeeperLog(zk_log); +} + + size_t KeeperMultiException::getFailedOpIndex(Coordination::Error exception_code, const Coordination::Responses & responses) { if (responses.empty()) diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 83b0ebbc472..bfbfea03aae 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -56,13 +56,15 @@ public: int32_t session_timeout_ms_ = Coordination::DEFAULT_SESSION_TIMEOUT_MS, int32_t operation_timeout_ms_ = Coordination::DEFAULT_OPERATION_TIMEOUT_MS, const std::string & chroot_ = "", - const std::string & implementation_ = "zookeeper"); + const std::string & implementation_ = "zookeeper", + std::shared_ptr zk_log_ = nullptr); ZooKeeper(const Strings & hosts_, const std::string & identity_ = "", int32_t session_timeout_ms_ = Coordination::DEFAULT_SESSION_TIMEOUT_MS, int32_t operation_timeout_ms_ = Coordination::DEFAULT_OPERATION_TIMEOUT_MS, const std::string & chroot_ = "", - const std::string & implementation_ = "zookeeper"); + const std::string & implementation_ = "zookeeper", + std::shared_ptr zk_log_ = nullptr); /** Config of the form: @@ -273,6 +275,8 @@ public: void finalize(); + void setZooKeeperLog(std::shared_ptr zk_log_); + private: friend class EphemeralNodeHolder; diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index bf67dccc684..5f15a3b8b75 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -315,9 +315,10 @@ ZooKeeper::ZooKeeper( std::shared_ptr zk_log_) : root_path(root_path_), session_timeout(session_timeout_), - operation_timeout(std::min(operation_timeout_, session_timeout_)), - zk_log(std::move(zk_log_)) + operation_timeout(std::min(operation_timeout_, session_timeout_)) { + std::atomic_store(&zk_log, std::move(zk_log_)); + if (!root_path.empty()) { if (root_path.back() == '/') @@ -1212,9 +1213,16 @@ void ZooKeeper::close() } +void ZooKeeper::setZooKeeperLog(std::shared_ptr zk_log_) +{ + /// logOperationIfNeeded(...) uses zk_log and can be called from different threads, so we have to use atomic shared_ptr + std::atomic_store(&zk_log, std::move(zk_log_)); +} + void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response, bool finalize) { - if (!zk_log) + auto maybe_zk_log = std::atomic_load(&zk_log); + if (!maybe_zk_log) return; ZooKeeperLogElement::Type log_type = ZooKeeperLogElement::UNKNOWN; @@ -1249,7 +1257,7 @@ void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const elem.event_time = event_time; elem.address = socket.peerAddress(); elem.session_id = session_id; - zk_log->add(elem); + maybe_zk_log->add(elem); } } diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h index 25c90c342a0..8f0f64ceafa 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -189,6 +189,8 @@ public: void finalize() override { finalize(false, false); } + void setZooKeeperLog(std::shared_ptr zk_log_); + private: String root_path; ACLs default_acls; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 33ebfbd21e0..413b0ac5582 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -1752,6 +1752,24 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const return shared->zookeeper; } +void Context::setSystemZooKeeperLogAfterInitializationIfNeeded() +{ + /// It can be nearly impossible to understand in which order global objects are initialized on server startup. + /// If getZooKeeper() is called before initializeSystemLogs(), then zkutil::ZooKeeper gets nullptr + /// instead of pointer to system table and it logs nothing. + /// This method explicitly sets correct pointer to system log after its initialization. + /// TODO get rid of this if possible + + std::lock_guard lock(shared->zookeeper_mutex); + if (!shared->system_logs || !shared->system_logs->zookeeper_log) + return; + + if (shared->zookeeper) + shared->zookeeper->setZooKeeperLog(shared->system_logs->zookeeper_log); + + for (auto & zk : shared->auxiliary_zookeepers) + zk.second->setZooKeeperLog(shared->system_logs->zookeeper_log); +} void Context::initializeKeeperStorageDispatcher() const { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 21abb1f8d91..e792fe07ec8 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -649,6 +649,8 @@ public: // Reload Zookeeper void reloadZooKeeperIfChanged(const ConfigurationPtr & config) const; + void setSystemZooKeeperLogAfterInitializationIfNeeded(); + /// Create a cache of uncompressed blocks of specified size. This can be done only once. void setUncompressedCache(size_t max_size_in_bytes); std::shared_ptr getUncompressedCache() const;