From 4165ba2a017ab04ae842f4a608ae02705bbe79a3 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 9 Jul 2021 17:05:35 +0300 Subject: [PATCH] add system.zookeeper_log --- docker/test/stateless/run.sh | 7 +- .../extract-from-config/ExtractFromConfig.cpp | 2 +- src/Common/ZooKeeper/ZooKeeper.cpp | 6 +- src/Common/ZooKeeper/ZooKeeper.h | 7 +- src/Common/ZooKeeper/ZooKeeperCommon.cpp | 134 ++++++++++++++++++ src/Common/ZooKeeper/ZooKeeperCommon.h | 32 +++++ src/Common/ZooKeeper/ZooKeeperImpl.cpp | 65 ++++++++- src/Common/ZooKeeper/ZooKeeperImpl.h | 10 +- src/Interpreters/Context.cpp | 26 +++- src/Interpreters/Context.h | 2 + src/Interpreters/DDLWorker.cpp | 30 +++- src/Interpreters/DDLWorker.h | 2 + src/Interpreters/InterpreterSystemQuery.cpp | 4 +- src/Interpreters/SystemLog.cpp | 4 + src/Interpreters/SystemLog.h | 3 + src/Interpreters/ZooKeeperLog.cpp | 130 +++++++++++++++++ src/Interpreters/ZooKeeperLog.h | 76 ++++++++++ tests/config/config.d/zookeeper_log.xml | 7 + tests/config/install.sh | 1 + .../0_stateless/01158_zookeeper_log.reference | 40 ++++++ .../0_stateless/01158_zookeeper_log.sql | 27 ++++ 21 files changed, 593 insertions(+), 22 deletions(-) create mode 100644 src/Interpreters/ZooKeeperLog.cpp create mode 100644 src/Interpreters/ZooKeeperLog.h create mode 100644 tests/config/config.d/zookeeper_log.xml create mode 100644 tests/queries/0_stateless/01158_zookeeper_log.reference create mode 100644 tests/queries/0_stateless/01158_zookeeper_log.sql diff --git a/docker/test/stateless/run.sh b/docker/test/stateless/run.sh index a7fb956bf94..eb3e6ae6cf8 100755 --- a/docker/test/stateless/run.sh +++ b/docker/test/stateless/run.sh @@ -138,15 +138,18 @@ if [[ -n "$WITH_COVERAGE" ]] && [[ "$WITH_COVERAGE" -eq 1 ]]; then fi tar -chf /test_output/text_log_dump.tar /var/lib/clickhouse/data/system/text_log ||: tar -chf /test_output/query_log_dump.tar /var/lib/clickhouse/data/system/query_log ||: +tar -chf /test_output/zookeeper_log_dump.tar /var/lib/clickhouse/data/system/zookeeper_log ||: tar -chf /test_output/coordination.tar /var/lib/clickhouse/coordination ||: if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then - grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server1.log ||: - grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server2.log ||: + grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server1.log ||: + grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server2.log ||: pigz < /var/log/clickhouse-server/clickhouse-server1.log > /test_output/clickhouse-server1.log.gz ||: pigz < /var/log/clickhouse-server/clickhouse-server2.log > /test_output/clickhouse-server2.log.gz ||: mv /var/log/clickhouse-server/stderr1.log /test_output/ ||: mv /var/log/clickhouse-server/stderr2.log /test_output/ ||: + tar -chf /test_output/zookeeper_log_dump1.tar /var/lib/clickhouse1/data/system/zookeeper_log ||: + tar -chf /test_output/zookeeper_log_dump2.tar /var/lib/clickhouse2/data/system/zookeeper_log ||: tar -chf /test_output/coordination1.tar /var/lib/clickhouse1/coordination ||: tar -chf /test_output/coordination2.tar /var/lib/clickhouse2/coordination ||: fi diff --git a/programs/extract-from-config/ExtractFromConfig.cpp b/programs/extract-from-config/ExtractFromConfig.cpp index dff7e81c430..3fd665bcb26 100644 --- a/programs/extract-from-config/ExtractFromConfig.cpp +++ b/programs/extract-from-config/ExtractFromConfig.cpp @@ -33,7 +33,7 @@ static std::string extractFromConfig( { DB::ConfigurationPtr bootstrap_configuration(new Poco::Util::XMLConfiguration(config_xml)); zkutil::ZooKeeperPtr zookeeper = std::make_shared( - *bootstrap_configuration, "zookeeper"); + *bootstrap_configuration, "zookeeper", nullptr); zkutil::ZooKeeperNodeCache zk_node_cache([&] { return zookeeper; }); config_xml = processor.processConfig(&has_zk_includes, &zk_node_cache); } diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 1ee70b0cc3f..f23a16bbf3b 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -111,7 +111,8 @@ void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_ identity_, Poco::Timespan(0, session_timeout_ms_ * 1000), Poco::Timespan(0, ZOOKEEPER_CONNECTION_TIMEOUT_MS * 1000), - Poco::Timespan(0, operation_timeout_ms_ * 1000)); + Poco::Timespan(0, operation_timeout_ms_ * 1000), + zk_log); if (chroot.empty()) LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(hosts, ",")); @@ -209,7 +210,8 @@ struct ZooKeeperArgs std::string implementation; }; -ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) +ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr zk_log_) + : zk_log(std::move(zk_log_)) { ZooKeeperArgs args(config, config_name); init(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot); diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 7aafee52bf0..83b0ebbc472 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -25,6 +25,10 @@ namespace CurrentMetrics extern const Metric EphemeralNode; } +namespace DB +{ + class ZooKeeperLog; +} namespace zkutil { @@ -82,7 +86,7 @@ public: user:password */ - ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name); + ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr zk_log_); /// Creates a new session with the same parameters. This method can be used for reconnecting /// after the session has expired. @@ -298,6 +302,7 @@ private: std::mutex mutex; Poco::Logger * log = nullptr; + std::shared_ptr zk_log; }; diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp index 1560d7a25da..74c2a01235b 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -537,6 +537,140 @@ void ZooKeeperSessionIDResponse::writeImpl(WriteBuffer & out) const Coordination::write(server_id, out); } + +void ZooKeeperRequest::createLogElements(LogElements & elems) const +{ + elems.emplace_back(); + auto & elem = elems.back(); + elem.xid = xid; + elem.has_watch = has_watch; + elem.op_num = static_cast(getOpNum()); + elem.path = getPath(); + elem.request_idx = elems.size() - 1; +} + + +void ZooKeeperCreateRequest::createLogElements(LogElements & elems) const +{ + ZooKeeperRequest::createLogElements(elems); + auto & elem = elems.back(); + elem.data = data; + elem.is_ephemeral = is_ephemeral; + elem.is_sequential = is_sequential; +} + +void ZooKeeperRemoveRequest::createLogElements(LogElements & elems) const +{ + ZooKeeperRequest::createLogElements(elems); + auto & elem = elems.back(); + elem.version = version; +} + +void ZooKeeperSetRequest::createLogElements(LogElements & elems) const +{ + ZooKeeperRequest::createLogElements(elems); + auto & elem = elems.back(); + elem.data = data; + elem.version = version; +} + +void ZooKeeperCheckRequest::createLogElements(LogElements & elems) const +{ + ZooKeeperRequest::createLogElements(elems); + auto & elem = elems.back(); + elem.version = version; +} + +void ZooKeeperMultiRequest::createLogElements(LogElements & elems) const +{ + ZooKeeperRequest::createLogElements(elems); + elems.back().requests_size = requests.size(); + for (const auto & request : requests) + { + auto * req = dynamic_cast(request.get()); + assert(!req->xid || req->xid == xid); + req->xid = xid; + req->createLogElements(elems); + } +} + + +void ZooKeeperResponse::fillLogElements(LogElements & elems, size_t idx) const +{ + auto & elem = elems[idx]; + assert(!elem.xid || elem.xid == xid); + elem.xid = xid; + int32_t response_op = tryGetOpNum(); + assert(!elem.op_num || elem.op_num == response_op || response_op < 0); + elem.op_num = response_op; + + elem.zxid = zxid; + elem.error = static_cast(error); +} + +void ZooKeeperWatchResponse::fillLogElements(LogElements & elems, size_t idx) const +{ + ZooKeeperResponse::fillLogElements(elems, idx); + auto & elem = elems[idx]; + elem.watch_type = type; + elem.watch_state = state; + elem.path = path; +} + +void ZooKeeperCreateResponse::fillLogElements(LogElements & elems, size_t idx) const +{ + ZooKeeperResponse::fillLogElements(elems, idx); + auto & elem = elems[idx]; + elem.path_created = path_created; +} + +void ZooKeeperExistsResponse::fillLogElements(LogElements & elems, size_t idx) const +{ + ZooKeeperResponse::fillLogElements(elems, idx); + auto & elem = elems[idx]; + elem.stat = stat; +} + +void ZooKeeperGetResponse::fillLogElements(LogElements & elems, size_t idx) const +{ + ZooKeeperResponse::fillLogElements(elems, idx); + auto & elem = elems[idx]; + elem.data = data; + elem.stat = stat; +} + +void ZooKeeperSetResponse::fillLogElements(LogElements & elems, size_t idx) const +{ + ZooKeeperResponse::fillLogElements(elems, idx); + auto & elem = elems[idx]; + elem.stat = stat; +} + +void ZooKeeperListResponse::fillLogElements(LogElements & elems, size_t idx) const +{ + ZooKeeperResponse::fillLogElements(elems, idx); + auto & elem = elems[idx]; + elem.stat = stat; + elem.children = names; +} + +void ZooKeeperMultiResponse::fillLogElements(LogElements & elems, size_t idx) const +{ + assert(idx == 0); + assert(elems.size() == responses.size() + 1); + ZooKeeperResponse::fillLogElements(elems, idx); + for (const auto & response : responses) + { + auto * resp = dynamic_cast(response.get()); + assert(!resp->xid || resp->xid == xid); + assert(!resp->zxid || resp->zxid == zxid); + resp->xid = xid; + resp->zxid = zxid; + resp->fillLogElements(elems, ++idx); + } +} + + void ZooKeeperRequestFactory::registerRequest(OpNum op_num, Creator creator) { if (!op_num_to_request.try_emplace(op_num, creator).second) diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index a816c1eb8bb..8e7c0832a09 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -22,6 +23,8 @@ namespace Coordination { +using LogElements = std::vector; + struct ZooKeeperResponse : virtual Response { XID xid = 0; @@ -32,6 +35,8 @@ struct ZooKeeperResponse : virtual Response virtual void writeImpl(WriteBuffer &) const = 0; virtual void write(WriteBuffer & out) const; virtual OpNum getOpNum() const = 0; + virtual void fillLogElements(LogElements & elems, size_t idx) const; + virtual int32_t tryGetOpNum() const { return static_cast(getOpNum()); } }; using ZooKeeperResponsePtr = std::shared_ptr; @@ -61,6 +66,8 @@ struct ZooKeeperRequest : virtual Request virtual ZooKeeperResponsePtr makeResponse() const = 0; virtual bool isReadRequest() const = 0; + + virtual void createLogElements(LogElements & elems) const; }; using ZooKeeperRequestPtr = std::shared_ptr; @@ -117,6 +124,9 @@ struct ZooKeeperWatchResponse final : WatchResponse, ZooKeeperResponse { throw Exception("OpNum for watch response doesn't exist", Error::ZRUNTIMEINCONSISTENCY); } + + void fillLogElements(LogElements & elems, size_t idx) const override; + int32_t tryGetOpNum() const override { return 0; } }; struct ZooKeeperAuthRequest final : ZooKeeperRequest @@ -184,6 +194,8 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest size_t bytesSize() const override { return CreateRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); } + void createLogElements(LogElements & elems) const override; + /// During recovery from log we don't rehash ACLs bool need_to_hash_acls = true; }; @@ -197,6 +209,8 @@ struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse OpNum getOpNum() const override { return OpNum::Create; } size_t bytesSize() const override { return CreateResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } + + void fillLogElements(LogElements & elems, size_t idx) const override; }; struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest @@ -212,6 +226,8 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest bool isReadRequest() const override { return false; } size_t bytesSize() const override { return RemoveRequest::bytesSize() + sizeof(xid); } + + void createLogElements(LogElements & elems) const override; }; struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse @@ -242,6 +258,8 @@ struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse OpNum getOpNum() const override { return OpNum::Exists; } size_t bytesSize() const override { return ExistsResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } + + void fillLogElements(LogElements & elems, size_t idx) const override; }; struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest @@ -263,6 +281,8 @@ struct ZooKeeperGetResponse final : GetResponse, ZooKeeperResponse OpNum getOpNum() const override { return OpNum::Get; } size_t bytesSize() const override { return GetResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } + + void fillLogElements(LogElements & elems, size_t idx) const override; }; struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest @@ -277,6 +297,8 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest bool isReadRequest() const override { return false; } size_t bytesSize() const override { return SetRequest::bytesSize() + sizeof(xid); } + + void createLogElements(LogElements & elems) const override; }; struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse @@ -286,6 +308,8 @@ struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse OpNum getOpNum() const override { return OpNum::Set; } size_t bytesSize() const override { return SetResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } + + void fillLogElements(LogElements & elems, size_t idx) const override; }; struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest @@ -311,6 +335,8 @@ struct ZooKeeperListResponse : ListResponse, ZooKeeperResponse OpNum getOpNum() const override { return OpNum::List; } size_t bytesSize() const override { return ListResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } + + void fillLogElements(LogElements & elems, size_t idx) const override; }; struct ZooKeeperSimpleListResponse final : ZooKeeperListResponse @@ -331,6 +357,8 @@ struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest bool isReadRequest() const override { return true; } size_t bytesSize() const override { return CheckRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); } + + void createLogElements(LogElements & elems) const override; }; struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse @@ -409,6 +437,8 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest bool isReadRequest() const override; size_t bytesSize() const override { return MultiRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); } + + void createLogElements(LogElements & elems) const override; }; struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse @@ -433,6 +463,8 @@ struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse void writeImpl(WriteBuffer & out) const override; size_t bytesSize() const override { return MultiResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } + + void fillLogElements(LogElements & elems, size_t idx) const override; }; /// Fake internal coordination (keeper) response. Never received from client diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 37cc1dddce2..f242f29c44d 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -311,10 +311,12 @@ ZooKeeper::ZooKeeper( const String & auth_data, Poco::Timespan session_timeout_, Poco::Timespan connection_timeout, - Poco::Timespan operation_timeout_) + Poco::Timespan operation_timeout_, + std::shared_ptr zk_log_) : root_path(root_path_), session_timeout(session_timeout_), - operation_timeout(std::min(operation_timeout_, session_timeout_)) + operation_timeout(std::min(operation_timeout_, session_timeout_)), + zk_log(std::move(zk_log_)) { if (!root_path.empty()) { @@ -578,6 +580,8 @@ void ZooKeeper::sendThread() info.request->probably_sent = true; info.request->write(*out); + logOperationIfNeeded(info.request); + /// We sent close request, exit if (info.request->xid == CLOSE_XID) break; @@ -747,6 +751,9 @@ void ZooKeeper::receiveEvent() if (!response) response = request_info.request->makeResponse(); + response->xid = xid; + response->zxid = zxid; + if (err != Error::ZOK) { response->error = err; @@ -785,6 +792,8 @@ void ZooKeeper::receiveEvent() int32_t actual_length = in->count() - count_before_event; if (length != actual_length) throw Exception("Response length doesn't match. Expected: " + DB::toString(length) + ", actual: " + DB::toString(actual_length), Error::ZMARSHALLINGERROR); + + logOperationIfNeeded(request_info.request, response); } catch (...) { @@ -802,6 +811,8 @@ void ZooKeeper::receiveEvent() { if (request_info.callback) request_info.callback(*response); + + logOperationIfNeeded(request_info.request, response); } catch (...) { @@ -879,17 +890,19 @@ void ZooKeeper::finalize(bool error_send, bool error_receive) for (auto & op : operations) { RequestInfo & request_info = op.second; - ResponsePtr response = request_info.request->makeResponse(); + ZooKeeperResponsePtr response = request_info.request->makeResponse(); response->error = request_info.request->probably_sent ? Error::ZCONNECTIONLOSS : Error::ZSESSIONEXPIRED; + response->xid = request_info.request->xid; if (request_info.callback) { try { request_info.callback(*response); + logOperationIfNeeded(request_info.request, response, true); } catch (...) { @@ -941,13 +954,15 @@ void ZooKeeper::finalize(bool error_send, bool error_receive) { if (info.callback) { - ResponsePtr response = info.request->makeResponse(); + ZooKeeperResponsePtr response = info.request->makeResponse(); if (response) { response->error = Error::ZSESSIONEXPIRED; + response->xid = info.request->xid; try { info.callback(*response); + logOperationIfNeeded(info.request, response, true); } catch (...) { @@ -1189,4 +1204,46 @@ void ZooKeeper::close() ProfileEvents::increment(ProfileEvents::ZooKeeperClose); } + +void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response, bool finalize) +{ + if (!zk_log) + return; + + ZooKeeperLogElement::Type log_type = ZooKeeperLogElement::UNKNOWN; + Decimal64 event_time = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch() + ).count(); + LogElements elems; + if (request) + { + request->createLogElements(elems); + log_type = ZooKeeperLogElement::SEND; + } + else + { + assert(response); + assert(response->xid == PING_XID || response->xid == WATCH_XID); + elems.emplace_back(); + } + + if (response) + { + response->fillLogElements(elems, 0); + log_type = ZooKeeperLogElement::RECEIVE; + } + + if (finalize) + log_type = ZooKeeperLogElement::FINALIZE; + + for (auto & elem : elems) + { + elem.type = log_type; + elem.event_time = event_time; + elem.address = socket.peerAddress(); + elem.session_id = session_id; + zk_log->add(elem); + } +} + } diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h index 2210fd98b18..25c90c342a0 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -80,6 +80,10 @@ namespace CurrentMetrics extern const Metric ZooKeeperSession; } +namespace DB +{ + class ZooKeeperLog; +} namespace Coordination { @@ -110,7 +114,8 @@ public: const String & auth_data, Poco::Timespan session_timeout_, Poco::Timespan connection_timeout, - Poco::Timespan operation_timeout_); + Poco::Timespan operation_timeout_, + std::shared_ptr zk_log_); ~ZooKeeper() override; @@ -258,7 +263,10 @@ private: template void read(T &); + void logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response = nullptr, bool finalize = false); + CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession}; + std::shared_ptr zk_log; }; } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 9b204f12ab2..abc65d39c83 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -1698,7 +1698,7 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const const auto & config = shared->zookeeper_config ? *shared->zookeeper_config : getConfigRef(); if (!shared->zookeeper) - shared->zookeeper = std::make_shared(config, "zookeeper"); + shared->zookeeper = std::make_shared(config, "zookeeper", getZooKeeperLog()); else if (shared->zookeeper->expired()) shared->zookeeper = shared->zookeeper->startNewSession(); @@ -1762,8 +1762,8 @@ zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const "config.xml", name); - zookeeper - = shared->auxiliary_zookeepers.emplace(name, std::make_shared(config, "auxiliary_zookeepers." + name)).first; + zookeeper = shared->auxiliary_zookeepers.emplace(name, + std::make_shared(config, "auxiliary_zookeepers." + name, getZooKeeperLog())).first; } else if (zookeeper->second->expired()) zookeeper->second = zookeeper->second->startNewSession(); @@ -1777,14 +1777,15 @@ void Context::resetZooKeeper() const shared->zookeeper.reset(); } -static void reloadZooKeeperIfChangedImpl(const ConfigurationPtr & config, const std::string & config_name, zkutil::ZooKeeperPtr & zk) +static void reloadZooKeeperIfChangedImpl(const ConfigurationPtr & config, const std::string & config_name, zkutil::ZooKeeperPtr & zk, + std::shared_ptr zk_log) { if (!zk || zk->configChanged(*config, config_name)) { if (zk) zk->finalize(); - zk = std::make_shared(*config, config_name); + zk = std::make_shared(*config, config_name, std::move(zk_log)); } } @@ -1792,7 +1793,7 @@ void Context::reloadZooKeeperIfChanged(const ConfigurationPtr & config) const { std::lock_guard lock(shared->zookeeper_mutex); shared->zookeeper_config = config; - reloadZooKeeperIfChangedImpl(config, "zookeeper", shared->zookeeper); + reloadZooKeeperIfChangedImpl(config, "zookeeper", shared->zookeeper, getZooKeeperLog()); } void Context::reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config) @@ -1807,7 +1808,7 @@ void Context::reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & it = shared->auxiliary_zookeepers.erase(it); else { - reloadZooKeeperIfChangedImpl(config, "auxiliary_zookeepers." + it->first, it->second); + reloadZooKeeperIfChangedImpl(config, "auxiliary_zookeepers." + it->first, it->second, getZooKeeperLog()); ++it; } } @@ -2090,6 +2091,17 @@ std::shared_ptr Context::getOpenTelemetrySpanLog() const } +std::shared_ptr Context::getZooKeeperLog() const +{ + auto lock = getLock(); + + if (!shared->system_logs) + return {}; + + return shared->system_logs->zookeeper_log; +} + + CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const { auto lock = getLock(); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 2b53c737915..0cdb5864248 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -76,6 +76,7 @@ class TraceLog; class MetricLog; class AsynchronousMetricLog; class OpenTelemetrySpanLog; +class ZooKeeperLog; struct MergeTreeSettings; class StorageS3Settings; class IDatabase; @@ -705,6 +706,7 @@ public: std::shared_ptr getMetricLog() const; std::shared_ptr getAsynchronousMetricLog() const; std::shared_ptr getOpenTelemetrySpanLog() const; + std::shared_ptr getZooKeeperLog() const; /// Returns an object used to log operations with parts if it possible. /// Provide table name to make required checks. diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 4e51c346b6f..47ca2b72db8 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -31,6 +31,8 @@ #include #include +#include + namespace fs = std::filesystem; @@ -371,7 +373,7 @@ void DDLWorker::scheduleTasks(bool reinitialized) } } - Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event); + Strings queue_nodes = zookeeper->getChildren(queue_dir, &queue_node_stat, queue_updated_event); size_t size_before_filtering = queue_nodes.size(); filterAndSortQueueNodes(queue_nodes); /// The following message is too verbose, but it can be useful too debug mysterious test failures in CI @@ -1136,10 +1138,32 @@ void DDLWorker::runMainThread() cleanup_event->set(); scheduleTasks(reinitialized); - LOG_DEBUG(log, "Waiting for queue updates"); + LOG_DEBUG(log, "Waiting for queue updates (stat: {}, {}, {}, {})", + queue_node_stat.version, queue_node_stat.cversion, queue_node_stat.numChildren, queue_node_stat.pzxid); /// FIXME It may hang for unknown reason. Timeout is just a hotfix. constexpr int queue_wait_timeout_ms = 10000; - queue_updated_event->tryWait(queue_wait_timeout_ms); + bool updated = queue_updated_event->tryWait(queue_wait_timeout_ms); + if (!updated) + { + Coordination::Stat new_stat; + tryGetZooKeeper()->get(queue_dir, &new_stat); + bool queue_changed = memcmp(&queue_node_stat, &new_stat, sizeof(Coordination::Stat)) != 0; + bool watch_triggered = queue_updated_event->tryWait(0); + if (queue_changed && !watch_triggered) + { + /// It should never happen. + /// Maybe log message, abort() and system.zookeeper_log will help to debug it and remove timeout (#26036). + LOG_TRACE( + log, + "Queue was not updated (stat: {}, {}, {}, {})", + new_stat.version, + new_stat.cversion, + new_stat.numChildren, + new_stat.pzxid); + context->getZooKeeperLog()->flush(); + abort(); + } + } } catch (const Coordination::Exception & e) { diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index 45218226fee..d05b9b27611 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -125,6 +126,7 @@ protected: std::optional first_failed_task_name; std::list current_tasks; + Coordination::Stat queue_node_stat; std::shared_ptr queue_updated_event = std::make_shared(); std::shared_ptr cleanup_event = std::make_shared(); std::atomic initialized = false; diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index bdeb4a30e9e..9f344b86234 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -416,7 +417,8 @@ BlockIO InterpreterSystemQuery::execute() [&] { if (auto text_log = getContext()->getTextLog()) text_log->flush(true); }, [&] { if (auto metric_log = getContext()->getMetricLog()) metric_log->flush(true); }, [&] { if (auto asynchronous_metric_log = getContext()->getAsynchronousMetricLog()) asynchronous_metric_log->flush(true); }, - [&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); } + [&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); }, + [&] { if (auto zookeeper_log = getContext()->getZooKeeperLog()) zookeeper_log->flush(true); } ); break; } diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index 31ceca8ec05..d3224a53ccd 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -103,6 +104,7 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf opentelemetry_span_log = createSystemLog( global_context, "system", "opentelemetry_span_log", config, "opentelemetry_span_log"); + zookeeper_log = createSystemLog(global_context, "system", "zookeeper_log", config, "zookeeper_log"); if (query_log) logs.emplace_back(query_log.get()); @@ -122,6 +124,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf logs.emplace_back(asynchronous_metric_log.get()); if (opentelemetry_span_log) logs.emplace_back(opentelemetry_span_log.get()); + if (zookeeper_log) + logs.emplace_back(zookeeper_log.get()); try { diff --git a/src/Interpreters/SystemLog.h b/src/Interpreters/SystemLog.h index ee3116362e5..b94f3f7d456 100644 --- a/src/Interpreters/SystemLog.h +++ b/src/Interpreters/SystemLog.h @@ -74,6 +74,7 @@ class CrashLog; class MetricLog; class AsynchronousMetricLog; class OpenTelemetrySpanLog; +class ZooKeeperLog; class ISystemLog @@ -110,6 +111,8 @@ struct SystemLogs std::shared_ptr asynchronous_metric_log; /// OpenTelemetry trace spans. std::shared_ptr opentelemetry_span_log; + /// Used to log all actions of ZooKeeper client + std::shared_ptr zookeeper_log; std::vector logs; }; diff --git a/src/Interpreters/ZooKeeperLog.cpp b/src/Interpreters/ZooKeeperLog.cpp new file mode 100644 index 00000000000..6f24aa0e8fc --- /dev/null +++ b/src/Interpreters/ZooKeeperLog.cpp @@ -0,0 +1,130 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes() +{ + auto event_type = std::make_shared( + DataTypeEnum8::Values + { + {"Send", static_cast(SEND)}, + {"Receive", static_cast(RECEIVE)}, + {"Finalize", static_cast(FINALIZE)}, + }); + + return + { + {"type", std::move(event_type)}, + {"event_date", std::make_shared()}, + {"event_time", std::make_shared(6)}, + {"address", DataTypeFactory::instance().get("IPv6")}, + {"port", std::make_shared()}, + {"session_id", std::make_shared()}, + + {"xid", std::make_shared()}, + {"has_watch", std::make_shared()}, + {"op_num", std::make_shared()}, + {"path", std::make_shared()}, + + {"data", std::make_shared()}, + + {"is_ephemeral", std::make_shared()}, + {"is_sequential", std::make_shared()}, + + {"version", std::make_shared(std::make_shared())}, + + {"requests_size", std::make_shared()}, + {"request_idx", std::make_shared()}, + + {"zxid", std::make_shared()}, + {"error", std::make_shared(std::make_shared())}, + + {"watch_type", std::make_shared()}, + {"watch_state", std::make_shared()}, + + {"path_created", std::make_shared()}, + + {"stat_czxid", std::make_shared()}, + {"stat_mzxid", std::make_shared()}, + {"stat_pzxid", std::make_shared()}, + {"stat_version", std::make_shared()}, + {"stat_cversion", std::make_shared()}, + {"stat_dataLength", std::make_shared()}, + {"stat_numChildren", std::make_shared()}, + + {"children", std::make_shared(std::make_shared())}, + }; +} + +void ZooKeeperLogElement::appendToBlock(MutableColumns & columns) const +{ + assert(type != UNKNOWN); + size_t i = 0; + + columns[i++]->insert(type); + auto event_time_seconds = event_time / 1000000; + columns[i++]->insert(DateLUT::instance().toDayNum(event_time_seconds).toUnderType()); + columns[i++]->insert(event_time); + columns[i++]->insert(IPv6ToBinary(address.host()).data()); + columns[i++]->insert(address.port()); + columns[i++]->insert(session_id); + + columns[i++]->insert(xid); + columns[i++]->insert(has_watch); + columns[i++]->insert(op_num); + columns[i++]->insert(path); + + columns[i++]->insert(data); + + columns[i++]->insert(is_ephemeral); + columns[i++]->insert(is_sequential); + + columns[i++]->insert(version ? Field(*version) : Field()); + + columns[i++]->insert(requests_size); + columns[i++]->insert(request_idx); + + columns[i++]->insert(zxid); + columns[i++]->insert(error ? Field(*error) : Field()); + + columns[i++]->insert(watch_type); + columns[i++]->insert(watch_state); + + columns[i++]->insert(path_created); + + columns[i++]->insert(stat.czxid); + columns[i++]->insert(stat.mzxid); + columns[i++]->insert(stat.pzxid); + columns[i++]->insert(stat.version); + columns[i++]->insert(stat.cversion); + columns[i++]->insert(stat.dataLength); + columns[i++]->insert(stat.numChildren); + + Array children_array; + for (const auto & c : children) + children_array.emplace_back(c); + columns[i++]->insert(children_array); +} + +}; diff --git a/src/Interpreters/ZooKeeperLog.h b/src/Interpreters/ZooKeeperLog.h new file mode 100644 index 00000000000..9f8756d0691 --- /dev/null +++ b/src/Interpreters/ZooKeeperLog.h @@ -0,0 +1,76 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ + +struct ZooKeeperLogElement +{ + enum Type + { + UNKNOWN = 0, + SEND = 1, + RECEIVE = 2, + FINALIZE = 3 + }; + + Type type = UNKNOWN; + Decimal64 event_time = 0; + Poco::Net::SocketAddress address; + Int64 session_id = 0; + + /// Common request info + Int32 xid = 0; + bool has_watch = false; + Int32 op_num = 0; + String path; + + /// create, set + String data; + + /// create + bool is_ephemeral = false; + bool is_sequential = false; + + /// remove, check, set + std::optional version = 0; + + /// multi + UInt32 requests_size = 0; + UInt32 request_idx = 0; + + /// Common response info + Int64 zxid = 0; + std::optional error; + + /// watch + Int32 watch_type = 0; + Int32 watch_state = 0; + + /// create + String path_created; + + /// exists, get, set, list + Coordination::Stat stat = {}; + + /// list + Strings children; + + + static std::string name() { return "ZooKeeperLog"; } + static NamesAndTypesList getNamesAndTypes(); + static NamesAndAliases getNamesAndAliases() { return {}; } + void appendToBlock(MutableColumns & columns) const; +}; + +class ZooKeeperLog : public SystemLog +{ + using SystemLog::SystemLog; +}; + +} diff --git a/tests/config/config.d/zookeeper_log.xml b/tests/config/config.d/zookeeper_log.xml new file mode 100644 index 00000000000..2ec1f2446ed --- /dev/null +++ b/tests/config/config.d/zookeeper_log.xml @@ -0,0 +1,7 @@ + + + system + zookeeper_log
+ 7500 +
+
diff --git a/tests/config/install.sh b/tests/config/install.sh index 08add810cbf..4d349e9d901 100755 --- a/tests/config/install.sh +++ b/tests/config/install.sh @@ -34,6 +34,7 @@ ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/tcp_with_proxy.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/top_level_domains_lists.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/top_level_domains_path.xml $DEST_SERVER_PATH/config.d/ +ln -sf $SRC_PATH/config.d/zookeeper_log.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/users.d/log_queries.xml $DEST_SERVER_PATH/users.d/ ln -sf $SRC_PATH/users.d/readonly.xml $DEST_SERVER_PATH/users.d/ ln -sf $SRC_PATH/users.d/access_management.xml $DEST_SERVER_PATH/users.d/ diff --git a/tests/queries/0_stateless/01158_zookeeper_log.reference b/tests/queries/0_stateless/01158_zookeeper_log.reference new file mode 100644 index 00000000000..28553aaad0a --- /dev/null +++ b/tests/queries/0_stateless/01158_zookeeper_log.reference @@ -0,0 +1,40 @@ +log +Receive 0 0 /test/01158/default/rmt/log 0 0 0 0 0 0 4 3 0 0 0 0 +Send 0 1 /test/01158/default/rmt/log 0 0 0 0 4 \N 0 0 0 0 0 0 +Receive 0 1 /test/01158/default/rmt/log 0 0 0 0 4 0 0 0 /test/01158/default/rmt/log 0 0 0 0 +Send 0 1 /test/01158/default/rmt/log/log- 0 1 0 0 1 \N 0 0 0 0 0 0 +Receive 0 1 /test/01158/default/rmt/log/log- 0 1 0 0 1 0 0 0 /test/01158/default/rmt/log/log-0000000000 0 0 0 0 +parts +Send 0 14 0 0 0 5 0 \N 0 0 0 0 0 0 +Send 0 1 /test/01158/default/rmt/log/log- 0 1 0 0 1 \N 0 0 0 0 0 0 +Send 0 2 /test/01158/default/rmt/block_numbers/all/block-0000000000 0 0 -1 0 2 \N 0 0 0 0 0 0 +Send 0 2 /test/01158/default/rmt/temp/abandonable_lock-0000000000 0 0 -1 0 3 \N 0 0 0 0 0 0 +Send 0 1 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 4 \N 0 0 0 0 0 0 +Send 0 1 /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 0 0 5 \N 0 0 0 0 0 0 +Receive 0 14 0 0 0 5 0 0 0 0 0 0 0 0 +Receive 0 1 /test/01158/default/rmt/log/log- 0 1 0 0 1 0 0 0 /test/01158/default/rmt/log/log-0000000000 0 0 0 0 +Receive 0 2 /test/01158/default/rmt/block_numbers/all/block-0000000000 0 0 -1 0 2 0 0 0 0 0 0 0 +Receive 0 2 /test/01158/default/rmt/temp/abandonable_lock-0000000000 0 0 -1 0 3 0 0 0 0 0 0 0 +Receive 0 1 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 4 0 0 0 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 +Receive 0 1 /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 0 0 5 0 0 0 /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 0 0 +Send 0 3 /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 0 0 0 \N 0 0 0 0 0 0 +Receive 0 3 /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 0 0 0 0 0 0 0 0 96 0 +blocks +Send 0 14 0 0 0 3 0 \N 0 0 0 0 0 0 +Send 0 1 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 1 \N 0 0 0 0 0 0 +Send 0 2 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 \N 0 0 0 0 0 0 +Send 0 1 /test/01158/default/rmt/temp/abandonable_lock- 1 1 0 0 3 \N 0 0 0 0 0 0 +Receive 0 14 0 0 0 3 0 0 0 0 0 0 0 0 +Receive 0 1 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 1 0 0 0 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 +Receive 0 2 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 0 0 0 0 0 0 0 +Receive 0 1 /test/01158/default/rmt/temp/abandonable_lock- 1 1 0 0 3 0 0 0 /test/01158/default/rmt/temp/abandonable_lock-0000000000 0 0 0 0 +Send 0 14 0 0 0 3 0 \N 0 0 0 0 0 0 +Send 0 1 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 1 \N 0 0 0 0 0 0 +Send 0 2 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 \N 0 0 0 0 0 0 +Send 0 1 /test/01158/default/rmt/temp/abandonable_lock- 1 1 0 0 3 \N 0 0 0 0 0 0 +Receive 0 14 0 0 0 3 0 -110 0 0 0 0 0 0 +Receive 0 -1 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 1 -110 0 0 0 0 0 0 +Receive 0 -1 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 -2 0 0 0 0 0 0 +Receive 0 -1 /test/01158/default/rmt/temp/abandonable_lock- 1 1 0 0 3 -2 0 0 0 0 0 0 +Send 0 4 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 0 \N 0 0 0 0 0 0 +Receive 0 4 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 0 0 0 0 0 0 9 0 diff --git a/tests/queries/0_stateless/01158_zookeeper_log.sql b/tests/queries/0_stateless/01158_zookeeper_log.sql new file mode 100644 index 00000000000..2186fa60346 --- /dev/null +++ b/tests/queries/0_stateless/01158_zookeeper_log.sql @@ -0,0 +1,27 @@ +drop table if exists rmt; +create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n; +insert into rmt values (1); +insert into rmt values (1); +system flush logs; + +select 'log'; +select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type, + watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren +from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/log%' and op_num not in (3, 4, 12) +order by xid, type, request_idx; + +select 'parts'; +select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type, + watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren +from system.zookeeper_log +where (session_id, xid) in (select session_id, xid from system.zookeeper_log where path='/test/01158/' || currentDatabase() || '/rmt/replicas/1/parts/all_0_0_0') +order by xid, type, request_idx; + +select 'blocks'; +select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type, + watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren +from system.zookeeper_log +where (session_id, xid) in (select session_id, xid from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/blocks%' and op_num not in (1, 12)) +order by xid, type, request_idx; + +drop table rmt;