From 01d990edd49b062a076d733b212509ff217984ef Mon Sep 17 00:00:00 2001 From: speeedmaster Date: Thu, 6 Jun 2024 21:27:47 +0000 Subject: [PATCH] add tests and fix issues --- programs/keeper/Keeper.cpp | 4 +- programs/keeper/dashboard.html | 6 +- src/Coordination/KeeperDispatcher.cpp | 28 ++++ src/Coordination/KeeperDispatcher.h | 3 + src/Server/HTTP/HTTPServerRequest.cpp | 12 ++ src/Server/HTTP/HTTPServerRequest.h | 2 + src/Server/HTTPRequestHandlerFactoryMain.cpp | 5 +- src/Server/KeeperDashboardRequestHandler.cpp | 5 +- src/Server/KeeperHTTPHandlerFactory.cpp | 38 +++--- src/Server/KeeperHTTPStorageHandler.cpp | 38 +++--- .../test_keeper_http_control/test.py | 25 ++++ .../__init__.py | 0 .../configs/enable_keeper1.xml | 37 ++++++ .../configs/enable_keeper2.xml | 37 ++++++ .../configs/enable_keeper3.xml | 37 ++++++ .../test_keeper_http_storage_control/test.py | 121 ++++++++++++++++++ 16 files changed, 346 insertions(+), 52 deletions(-) create mode 100644 tests/integration/test_keeper_http_storage_control/__init__.py create mode 100644 tests/integration/test_keeper_http_storage_control/configs/enable_keeper1.xml create mode 100644 tests/integration/test_keeper_http_storage_control/configs/enable_keeper2.xml create mode 100644 tests/integration/test_keeper_http_storage_control/configs/enable_keeper3.xml create mode 100644 tests/integration/test_keeper_http_storage_control/test.py diff --git a/programs/keeper/Keeper.cpp b/programs/keeper/Keeper.cpp index 3eb41e317aa..0dc5bdb92ac 100644 --- a/programs/keeper/Keeper.cpp +++ b/programs/keeper/Keeper.cpp @@ -546,7 +546,7 @@ try }); /// HTTPS control endpoints - port_name = "keeper_server.http_control.https_port"; + port_name = "keeper_server.http_control.secure_port"; createServer(listen_host, port_name, listen_try, [&](UInt16 port) mutable { auto my_http_context = httpContext(); @@ -558,7 +558,7 @@ try servers->emplace_back( listen_host, port_name, - "HTTP Control: http://" + address.toString(), + "HTTPS Control: https://" + address.toString(), std::make_unique( std::move(my_http_context), createKeeperHTTPHandlerFactory(*this, config, global_context->getKeeperDispatcher(), "KeeperHTTPSHandler-factory"), diff --git a/programs/keeper/dashboard.html b/programs/keeper/dashboard.html index 171e5cfc515..c98b4613f8e 100644 --- a/programs/keeper/dashboard.html +++ b/programs/keeper/dashboard.html @@ -689,13 +689,13 @@ document.getElementById('displayStoragePath').innerText = this.selected_node?.path ?? '--'; // stat document.getElementById('displayStorageAVersion').innerText = this.selected_node?.stat?.aversion ?? '--'; - document.getElementById('displayStorageCZxid').innerText = this.selected_node?.stat?.cZxid ?? '--'; + document.getElementById('displayStorageCZxid').innerText = this.selected_node?.stat?.czxid ?? '--'; document.getElementById('displayStorageCVersion').innerText = this.selected_node?.stat?.cversion ?? '--'; document.getElementById('displayStorageDataLength').innerText = this.selected_node?.stat?.dataLength ?? '--'; document.getElementById('displayStorageEphemeralOwner').innerText = this.selected_node?.stat?.ephemeralOwner ?? '--'; - document.getElementById('displayStorageMZxid').innerText = this.selected_node?.stat?.mZxid ?? '--'; + document.getElementById('displayStorageMZxid').innerText = this.selected_node?.stat?.mzxid ?? '--'; document.getElementById('displayStorageNumChildren').innerText = this.selected_node?.stat?.numChildren ?? '--'; - document.getElementById('displayStoragePZxid').innerText = this.selected_node?.stat?.pZxid ?? '--'; + document.getElementById('displayStoragePZxid').innerText = this.selected_node?.stat?.pzxid ?? '--'; document.getElementById('displayStorageVersion').innerText = this.selected_node?.stat?.version ?? '--'; if (this.selected_node?.stat?.mtime != undefined) { document.getElementById('displayStorageMtime').innerText = new Date(this.selected_node?.stat?.mtime).toLocaleString('en-US', { hour12: false }); diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 71bf2a53533..267fb8f88d7 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -425,6 +425,34 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ return true; } +bool KeeperDispatcher::putLocalReadRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id) +{ + if (!request->isReadRequest()) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot put non-read request locally"); + } + + { + /// If session was already disconnected than we will ignore requests + std::lock_guard lock(session_to_response_callback_mutex); + if (!session_to_response_callback.contains(session_id)) + return false; + } + + KeeperStorage::RequestForSession request_info; + request_info.request = request; + using namespace std::chrono; + request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); + request_info.session_id = session_id; + + if (keeper_context->isShutdownCalled()) + return false; + + server->putLocalReadRequest(request_info); + CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets); + return true; +} + void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper, bool start_async, const MultiVersion::Version & macros) { LOG_DEBUG(log, "Initializing storage dispatcher"); diff --git a/src/Coordination/KeeperDispatcher.h b/src/Coordination/KeeperDispatcher.h index 651fd0e1c88..846d8ca5e79 100644 --- a/src/Coordination/KeeperDispatcher.h +++ b/src/Coordination/KeeperDispatcher.h @@ -142,6 +142,9 @@ public: /// Put request to ClickHouse Keeper bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id); + /// Put local read request to ClickHouse Keeper + bool putLocalReadRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id); + /// Get new session ID int64_t getSessionID(int64_t session_timeout_ms); diff --git a/src/Server/HTTP/HTTPServerRequest.cpp b/src/Server/HTTP/HTTPServerRequest.cpp index 3e82ec82550..ddde8b17a38 100644 --- a/src/Server/HTTP/HTTPServerRequest.cpp +++ b/src/Server/HTTP/HTTPServerRequest.cpp @@ -173,4 +173,16 @@ void HTTPServerRequest::readRequest(ReadBuffer & in) setVersion(version); } +std::string HTTPServerRequest::toStringForLogging() const { + return fmt::format( + "Method: {}, Address: {}, User-Agent: {}{}, Content Type: {}, Transfer Encoding: {}, X-Forwarded-For: {}", + getMethod(), + clientAddress().toString(), + get("User-Agent", "(none)"), + (hasContentLength() ? (", Length: " + std::to_string(getContentLength())) : ("")), + getContentType(), + getTransferEncoding(), + get("X-Forwarded-For", "(none)")); +} + } diff --git a/src/Server/HTTP/HTTPServerRequest.h b/src/Server/HTTP/HTTPServerRequest.h index aaec89ab757..3ad1b5589cd 100644 --- a/src/Server/HTTP/HTTPServerRequest.h +++ b/src/Server/HTTP/HTTPServerRequest.h @@ -48,6 +48,8 @@ public: Poco::Net::X509Certificate peerCertificate() const; #endif + std::string toStringForLogging() const; + private: /// Limits for basic sanity checks when reading a header enum Limits diff --git a/src/Server/HTTPRequestHandlerFactoryMain.cpp b/src/Server/HTTPRequestHandlerFactoryMain.cpp index 48c2ab21468..90fc105053c 100644 --- a/src/Server/HTTPRequestHandlerFactoryMain.cpp +++ b/src/Server/HTTPRequestHandlerFactoryMain.cpp @@ -13,10 +13,7 @@ HTTPRequestHandlerFactoryMain::HTTPRequestHandlerFactoryMain(const std::string & std::unique_ptr HTTPRequestHandlerFactoryMain::createRequestHandler(const HTTPServerRequest & request) { - LOG_TRACE(log, "HTTP Request for {}. Method: {}, Address: {}, User-Agent: {}{}, Content Type: {}, Transfer Encoding: {}, X-Forwarded-For: {}", - name, request.getMethod(), request.clientAddress().toString(), request.get("User-Agent", "(none)"), - (request.hasContentLength() ? (", Length: " + std::to_string(request.getContentLength())) : ("")), - request.getContentType(), request.getTransferEncoding(), request.get("X-Forwarded-For", "(none)")); + LOG_TRACE(log, "HTTP Request for {}. {}", name, request.toStringForLogging()); for (auto & handler_factory : child_factories) { diff --git a/src/Server/KeeperDashboardRequestHandler.cpp b/src/Server/KeeperDashboardRequestHandler.cpp index 36623b6cb28..92200fa0161 100644 --- a/src/Server/KeeperDashboardRequestHandler.cpp +++ b/src/Server/KeeperDashboardRequestHandler.cpp @@ -29,9 +29,6 @@ namespace DB void KeeperDashboardWebUIRequestHandler::handleRequest( HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event &) { - /// Raw config reference is used here to avoid dependency on Context and ServerSettings. - /// This is painful, because this class is also used in a build with CLICKHOUSE_KEEPER_STANDALONE_BUILD=1 - /// And there ordinary Context is replaced with a tiny clone. const auto & config = server.config(); const auto keep_alive_timeout = config.getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT); @@ -103,7 +100,7 @@ catch (...) } catch (...) { - LOG_ERROR((getLogger("KeeperDashboardContentRequestHandler")), "Cannot send exception to client"); + LOG_ERROR(getLogger("KeeperDashboardContentRequestHandler"), "Cannot send exception to client"); } } diff --git a/src/Server/KeeperHTTPHandlerFactory.cpp b/src/Server/KeeperHTTPHandlerFactory.cpp index cc44cdab486..b568625d562 100644 --- a/src/Server/KeeperHTTPHandlerFactory.cpp +++ b/src/Server/KeeperHTTPHandlerFactory.cpp @@ -37,17 +37,7 @@ KeeperHTTPRequestHandlerFactory::KeeperHTTPRequestHandlerFactory(const std::stri std::unique_ptr KeeperHTTPRequestHandlerFactory::createRequestHandler(const HTTPServerRequest & request) { - LOG_TRACE( - log, - "HTTP Request for {}. Method: {}, Address: {}, User-Agent: {}{}, Content Type: {}, Transfer Encoding: {}, X-Forwarded-For: {}", - name, - request.getMethod(), - request.clientAddress().toString(), - request.get("User-Agent", "(none)"), - (request.hasContentLength() ? (", Length: " + std::to_string(request.getContentLength())) : ("")), - request.getContentType(), - request.getTransferEncoding(), - request.get("X-Forwarded-For", "(none)")); + LOG_TRACE(log, "HTTP Request for {}. {}", name, request.toStringForLogging()); for (auto & handler_factory : child_factories) { @@ -87,6 +77,20 @@ void addDashboardHandlersToFactory( factory.addHandler(dashboard_content_handler); } +void addReadinessHandlerToFactory( + KeeperHTTPRequestHandlerFactory & factory, + std::shared_ptr keeper_dispatcher, + const Poco::Util::AbstractConfiguration & config) +{ + auto creator = [keeper_dispatcher]() -> std::unique_ptr + { return std::make_unique(keeper_dispatcher); }; + auto readiness_handler = std::make_shared>(std::move(creator)); + readiness_handler->attachStrictPath(config.getString("keeper_server.http_control.readiness.endpoint", "/ready")); + readiness_handler->allowGetAndHeadRequest(); + factory.addPathToHints("/ready"); + factory.addHandler(readiness_handler); +} + void addCommandsHandlersToFactory( KeeperHTTPRequestHandlerFactory & factory, const IServer & server, std::shared_ptr keeper_dispatcher) { @@ -121,14 +125,7 @@ void addDefaultHandlersToFactory( std::shared_ptr keeper_dispatcher, const Poco::Util::AbstractConfiguration & config) { - auto readiness_creator = [keeper_dispatcher]() -> std::unique_ptr - { return std::make_unique(keeper_dispatcher); }; - auto readiness_handler = std::make_shared>(std::move(readiness_creator)); - readiness_handler->attachStrictPath(config.getString("keeper_server.http_control.readiness.endpoint", "/ready")); - readiness_handler->allowGetAndHeadRequest(); - factory.addPathToHints("/ready"); - factory.addHandler(readiness_handler); - + addReadinessHandlerToFactory(factory, keeper_dispatcher, config); addDashboardHandlersToFactory(factory, server, keeper_dispatcher); addCommandsHandlersToFactory(factory, server, keeper_dispatcher); addStorageHandlersToFactory(factory, server, keeper_dispatcher); @@ -163,7 +160,8 @@ static inline auto createHandlersFactoryFromConfig( "{}.{}.handler.type", prefix, key); - + if (handler_type == "ready") + addReadinessHandlerToFactory(*main_handler_factory, keeper_dispatcher, config); if (handler_type == "dashboard") addDashboardHandlersToFactory(*main_handler_factory, server, keeper_dispatcher); if (handler_type == "commands") diff --git a/src/Server/KeeperHTTPStorageHandler.cpp b/src/Server/KeeperHTTPStorageHandler.cpp index 472bab8ba9e..aeb3958cfb1 100644 --- a/src/Server/KeeperHTTPStorageHandler.cpp +++ b/src/Server/KeeperHTTPStorageHandler.cpp @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -27,9 +28,9 @@ extern const int TIMEOUT_EXCEEDED; Poco::JSON::Object toJSON(const Coordination::Stat & stat) { Poco::JSON::Object result; - result.set("cZxid", stat.czxid); - result.set("mZxid", stat.mzxid); - result.set("pZxid", stat.pzxid); + result.set("czxid", stat.czxid); + result.set("mzxid", stat.mzxid); + result.set("pzxid", stat.pzxid); result.set("ctime", stat.ctime); result.set("mtime", stat.mtime); result.set("version", stat.version); @@ -53,7 +54,7 @@ std::optional getVersionFromRequest(const HTTPServerRequest & request) try { - return std::stoi(version_param->second); + return parse(version_param->second); } catch (...) { @@ -136,27 +137,26 @@ Coordination::ResponsePtr KeeperHTTPStorageHandler::awaitKeeperResponse(std::sha = [response_promise](const Coordination::ResponsePtr & zk_response) mutable { response_promise->set_value(zk_response); }; const auto session_id = keeper_dispatcher->getSessionID(session_timeout.totalMilliseconds()); + keeper_dispatcher->registerSession(session_id, response_callback); + SCOPE_EXIT({ keeper_dispatcher->finishSession(session_id); }); - try + if (request->isReadRequest()) { - if (!keeper_dispatcher->putRequest(std::move(request), session_id)) - throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id); - - if (response_future.wait_for(std::chrono::milliseconds(operation_timeout.totalMilliseconds())) != std::future_status::ready) - throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Operation timeout ({} ms) exceeded.", operation_timeout.totalMilliseconds()); - - auto result = response_future.get(); - - keeper_dispatcher->finishSession(session_id); - return result; + keeper_dispatcher->putLocalReadRequest(std::move(request), session_id); } - catch (...) + else if (!keeper_dispatcher->putRequest(std::move(request), session_id)) { - // it is obligatory to finish the session, as it affects num_alive_connections metric - keeper_dispatcher->finishSession(session_id); - throw; + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id); } + + if (response_future.wait_for(std::chrono::milliseconds(operation_timeout.totalMilliseconds())) != std::future_status::ready) + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Operation timeout ({} ms) exceeded.", operation_timeout.totalMilliseconds()); + + auto result = response_future.get(); + + keeper_dispatcher->finishSession(session_id); + return result; } void KeeperHTTPStorageHandler::performZooKeeperExistsRequest(const std::string & storage_path, HTTPServerResponse & response) diff --git a/tests/integration/test_keeper_http_control/test.py b/tests/integration/test_keeper_http_control/test.py index 65dc5bea909..184e7c37030 100644 --- a/tests/integration/test_keeper_http_control/test.py +++ b/tests/integration/test_keeper_http_control/test.py @@ -72,3 +72,28 @@ def test_http_readiness_partitioned_cluster(started_cluster): assert readiness_data["status"] == "fail" assert readiness_data["details"]["role"] == "follower" assert readiness_data["details"]["hasLeader"] == False + +def test_http_commands_basic_responses(started_cluster): + leader = keeper_utils.get_leader(cluster, [node1, node2, node3]) + response = requests.get( + "http://{host}:{port}/api/v1/commands/conf".format( + host=leader.ip_address, port=9182 + ) + ) + assert response.status_code == 200 + + command_data = response.json() + assert command_data["result"] == keeper_utils.send_4lw_cmd(cluster, leader, "conf") + + follower = keeper_utils.get_any_follower(cluster, [node1, node2, node3]) + response = requests.get( + "http://{host}:{port}/api/v1/commands/conf".format( + host=follower.ip_address, port=9182 + ) + ) + assert response.status_code == 200 + + command_data = response.json() + assert command_data["result"] == keeper_utils.send_4lw_cmd( + cluster, follower, "conf" + ) diff --git a/tests/integration/test_keeper_http_storage_control/__init__.py b/tests/integration/test_keeper_http_storage_control/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_keeper_http_storage_control/configs/enable_keeper1.xml b/tests/integration/test_keeper_http_storage_control/configs/enable_keeper1.xml new file mode 100644 index 00000000000..20e3c307f31 --- /dev/null +++ b/tests/integration/test_keeper_http_storage_control/configs/enable_keeper1.xml @@ -0,0 +1,37 @@ + + + 9181 + 1 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + trace + + + + + 1 + node1 + 9234 + + + 2 + node2 + 9234 + true + + + 3 + node3 + 9234 + true + + + + 9182 + + + diff --git a/tests/integration/test_keeper_http_storage_control/configs/enable_keeper2.xml b/tests/integration/test_keeper_http_storage_control/configs/enable_keeper2.xml new file mode 100644 index 00000000000..b9002eb2436 --- /dev/null +++ b/tests/integration/test_keeper_http_storage_control/configs/enable_keeper2.xml @@ -0,0 +1,37 @@ + + + 9181 + 2 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + trace + + + + + 1 + node1 + 9234 + + + 2 + node2 + 9234 + true + + + 3 + node3 + 9234 + true + + + + 9182 + + + diff --git a/tests/integration/test_keeper_http_storage_control/configs/enable_keeper3.xml b/tests/integration/test_keeper_http_storage_control/configs/enable_keeper3.xml new file mode 100644 index 00000000000..6e4e17399f7 --- /dev/null +++ b/tests/integration/test_keeper_http_storage_control/configs/enable_keeper3.xml @@ -0,0 +1,37 @@ + + + 9181 + 3 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + trace + + + + + 1 + node1 + 9234 + + + 2 + node2 + 9234 + true + + + 3 + node3 + 9234 + true + + + + 9182 + + + diff --git a/tests/integration/test_keeper_http_storage_control/test.py b/tests/integration/test_keeper_http_storage_control/test.py new file mode 100644 index 00000000000..14e2f8399ca --- /dev/null +++ b/tests/integration/test_keeper_http_storage_control/test.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 + +import os +import pytest +import requests + +from helpers.cluster import ClickHouseCluster +from helpers.network import PartitionManager +import helpers.keeper_utils as keeper_utils + +cluster = ClickHouseCluster(__file__) +CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs") + +node1 = cluster.add_instance( + "node1", main_configs=["configs/enable_keeper1.xml"], stay_alive=True +) +node2 = cluster.add_instance( + "node2", main_configs=["configs/enable_keeper2.xml"], stay_alive=True +) +node3 = cluster.add_instance( + "node3", main_configs=["configs/enable_keeper3.xml"], stay_alive=True +) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def send_storage_request( + node, path, data=None, params=None, expected_response_code=200 +): + response = requests.post( + "http://{host}:9182/api/v1/storage{path}".format( + host=node.ip_address, path=path + ), + data=data, + params=params, + ) + assert response.status_code == expected_response_code + return response + + +def test_keeper_http_storage_create_get_exists(started_cluster): + follower = keeper_utils.get_any_follower(cluster, [node1, node2, node3]) + + test_content = b"test_data" + send_storage_request(follower, "/create/test_storage_get", test_content) + + send_storage_request(follower, "/exists/test_storage_get") + + response = send_storage_request(follower, "/get/test_storage_get") + assert response.content == test_content + + send_storage_request( + follower, "/get/test_storage_get/not_found", expected_response_code=404 + ) + send_storage_request( + follower, "/exists/test_storage_get/not_found", expected_response_code=404 + ) + + +def test_keeper_http_storage_set(started_cluster): + follower = keeper_utils.get_any_follower(cluster, [node1, node2, node3]) + + send_storage_request(follower, "/create/test_storage_set") + + response = send_storage_request(follower, "/get/test_storage_set") + assert response.content == b"" + + test_content = b"test_content" + send_storage_request( + follower, "/set/test_storage_set", test_content, params={"version": 0} + ) + + response = send_storage_request(follower, "/get/test_storage_set") + assert response.content == test_content + + # version is not set + send_storage_request( + follower, "/set/test_storage_set", test_content, expected_response_code=400 + ) + + # node not found + send_storage_request( + follower, + "/set/test_storage_set/not_found", + test_content, + params={"version": 0}, + expected_response_code=404, + ) + + +def test_keeper_http_storage_list_remove(started_cluster): + follower = keeper_utils.get_any_follower(cluster, [node1, node2, node3]) + + send_storage_request(follower, "/create/test_storage_list") + send_storage_request(follower, "/create/test_storage_list/a") + send_storage_request(follower, "/create/test_storage_list/b") + send_storage_request(follower, "/create/test_storage_list/c") + + response = send_storage_request(follower, "/list/test_storage_list") + assert sorted(response.json()["child_node_names"]) == ["a", "b", "c"] + + send_storage_request(follower, "/remove/test_storage_list/b", params={"version": 0}) + + response = send_storage_request(follower, "/list/test_storage_list") + assert sorted(response.json()["child_node_names"]) == ["a", "c"] + + # version is not set + send_storage_request( + follower, "/remove/test_storage_list/a", expected_response_code=400 + ) + + response = send_storage_request( + follower, "/list/test_storage_list/not_found", expected_response_code=404 + )