add tests and fix issues

This commit is contained in:
speeedmaster 2024-06-06 21:27:47 +00:00 committed by Aleksandr Tolkachev
parent cc6b8a81a2
commit 01d990edd4
16 changed files with 346 additions and 52 deletions

View File

@ -546,7 +546,7 @@ try
});
/// HTTPS control endpoints
port_name = "keeper_server.http_control.https_port";
port_name = "keeper_server.http_control.secure_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port) mutable
{
auto my_http_context = httpContext();
@ -558,7 +558,7 @@ try
servers->emplace_back(
listen_host,
port_name,
"HTTP Control: http://" + address.toString(),
"HTTPS Control: https://" + address.toString(),
std::make_unique<HTTPServer>(
std::move(my_http_context),
createKeeperHTTPHandlerFactory(*this, config, global_context->getKeeperDispatcher(), "KeeperHTTPSHandler-factory"),

View File

@ -689,13 +689,13 @@
document.getElementById('displayStoragePath').innerText = this.selected_node?.path ?? '--';
// stat
document.getElementById('displayStorageAVersion').innerText = this.selected_node?.stat?.aversion ?? '--';
document.getElementById('displayStorageCZxid').innerText = this.selected_node?.stat?.cZxid ?? '--';
document.getElementById('displayStorageCZxid').innerText = this.selected_node?.stat?.czxid ?? '--';
document.getElementById('displayStorageCVersion').innerText = this.selected_node?.stat?.cversion ?? '--';
document.getElementById('displayStorageDataLength').innerText = this.selected_node?.stat?.dataLength ?? '--';
document.getElementById('displayStorageEphemeralOwner').innerText = this.selected_node?.stat?.ephemeralOwner ?? '--';
document.getElementById('displayStorageMZxid').innerText = this.selected_node?.stat?.mZxid ?? '--';
document.getElementById('displayStorageMZxid').innerText = this.selected_node?.stat?.mzxid ?? '--';
document.getElementById('displayStorageNumChildren').innerText = this.selected_node?.stat?.numChildren ?? '--';
document.getElementById('displayStoragePZxid').innerText = this.selected_node?.stat?.pZxid ?? '--';
document.getElementById('displayStoragePZxid').innerText = this.selected_node?.stat?.pzxid ?? '--';
document.getElementById('displayStorageVersion').innerText = this.selected_node?.stat?.version ?? '--';
if (this.selected_node?.stat?.mtime != undefined) {
document.getElementById('displayStorageMtime').innerText = new Date(this.selected_node?.stat?.mtime).toLocaleString('en-US', { hour12: false });

View File

@ -425,6 +425,34 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
return true;
}
bool KeeperDispatcher::putLocalReadRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
{
if (!request->isReadRequest())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot put non-read request locally");
}
{
/// If session was already disconnected than we will ignore requests
std::lock_guard lock(session_to_response_callback_mutex);
if (!session_to_response_callback.contains(session_id))
return false;
}
KeeperStorage::RequestForSession request_info;
request_info.request = request;
using namespace std::chrono;
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
request_info.session_id = session_id;
if (keeper_context->isShutdownCalled())
return false;
server->putLocalReadRequest(request_info);
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
return true;
}
void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper, bool start_async, const MultiVersion<Macros>::Version & macros)
{
LOG_DEBUG(log, "Initializing storage dispatcher");

View File

@ -142,6 +142,9 @@ public:
/// Put request to ClickHouse Keeper
bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
/// Put local read request to ClickHouse Keeper
bool putLocalReadRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
/// Get new session ID
int64_t getSessionID(int64_t session_timeout_ms);

View File

@ -173,4 +173,16 @@ void HTTPServerRequest::readRequest(ReadBuffer & in)
setVersion(version);
}
std::string HTTPServerRequest::toStringForLogging() const {
return fmt::format(
"Method: {}, Address: {}, User-Agent: {}{}, Content Type: {}, Transfer Encoding: {}, X-Forwarded-For: {}",
getMethod(),
clientAddress().toString(),
get("User-Agent", "(none)"),
(hasContentLength() ? (", Length: " + std::to_string(getContentLength())) : ("")),
getContentType(),
getTransferEncoding(),
get("X-Forwarded-For", "(none)"));
}
}

View File

@ -48,6 +48,8 @@ public:
Poco::Net::X509Certificate peerCertificate() const;
#endif
std::string toStringForLogging() const;
private:
/// Limits for basic sanity checks when reading a header
enum Limits

View File

@ -13,10 +13,7 @@ HTTPRequestHandlerFactoryMain::HTTPRequestHandlerFactoryMain(const std::string &
std::unique_ptr<HTTPRequestHandler> HTTPRequestHandlerFactoryMain::createRequestHandler(const HTTPServerRequest & request)
{
LOG_TRACE(log, "HTTP Request for {}. Method: {}, Address: {}, User-Agent: {}{}, Content Type: {}, Transfer Encoding: {}, X-Forwarded-For: {}",
name, request.getMethod(), request.clientAddress().toString(), request.get("User-Agent", "(none)"),
(request.hasContentLength() ? (", Length: " + std::to_string(request.getContentLength())) : ("")),
request.getContentType(), request.getTransferEncoding(), request.get("X-Forwarded-For", "(none)"));
LOG_TRACE(log, "HTTP Request for {}. {}", name, request.toStringForLogging());
for (auto & handler_factory : child_factories)
{

View File

@ -29,9 +29,6 @@ namespace DB
void KeeperDashboardWebUIRequestHandler::handleRequest(
HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event &)
{
/// Raw config reference is used here to avoid dependency on Context and ServerSettings.
/// This is painful, because this class is also used in a build with CLICKHOUSE_KEEPER_STANDALONE_BUILD=1
/// And there ordinary Context is replaced with a tiny clone.
const auto & config = server.config();
const auto keep_alive_timeout = config.getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT);
@ -103,7 +100,7 @@ catch (...)
}
catch (...)
{
LOG_ERROR((getLogger("KeeperDashboardContentRequestHandler")), "Cannot send exception to client");
LOG_ERROR(getLogger("KeeperDashboardContentRequestHandler"), "Cannot send exception to client");
}
}

View File

@ -37,17 +37,7 @@ KeeperHTTPRequestHandlerFactory::KeeperHTTPRequestHandlerFactory(const std::stri
std::unique_ptr<HTTPRequestHandler> KeeperHTTPRequestHandlerFactory::createRequestHandler(const HTTPServerRequest & request)
{
LOG_TRACE(
log,
"HTTP Request for {}. Method: {}, Address: {}, User-Agent: {}{}, Content Type: {}, Transfer Encoding: {}, X-Forwarded-For: {}",
name,
request.getMethod(),
request.clientAddress().toString(),
request.get("User-Agent", "(none)"),
(request.hasContentLength() ? (", Length: " + std::to_string(request.getContentLength())) : ("")),
request.getContentType(),
request.getTransferEncoding(),
request.get("X-Forwarded-For", "(none)"));
LOG_TRACE(log, "HTTP Request for {}. {}", name, request.toStringForLogging());
for (auto & handler_factory : child_factories)
{
@ -87,6 +77,20 @@ void addDashboardHandlersToFactory(
factory.addHandler(dashboard_content_handler);
}
void addReadinessHandlerToFactory(
KeeperHTTPRequestHandlerFactory & factory,
std::shared_ptr<KeeperDispatcher> keeper_dispatcher,
const Poco::Util::AbstractConfiguration & config)
{
auto creator = [keeper_dispatcher]() -> std::unique_ptr<KeeperHTTPReadinessHandler>
{ return std::make_unique<KeeperHTTPReadinessHandler>(keeper_dispatcher); };
auto readiness_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<KeeperHTTPReadinessHandler>>(std::move(creator));
readiness_handler->attachStrictPath(config.getString("keeper_server.http_control.readiness.endpoint", "/ready"));
readiness_handler->allowGetAndHeadRequest();
factory.addPathToHints("/ready");
factory.addHandler(readiness_handler);
}
void addCommandsHandlersToFactory(
KeeperHTTPRequestHandlerFactory & factory, const IServer & server, std::shared_ptr<KeeperDispatcher> keeper_dispatcher)
{
@ -121,14 +125,7 @@ void addDefaultHandlersToFactory(
std::shared_ptr<KeeperDispatcher> keeper_dispatcher,
const Poco::Util::AbstractConfiguration & config)
{
auto readiness_creator = [keeper_dispatcher]() -> std::unique_ptr<KeeperHTTPReadinessHandler>
{ return std::make_unique<KeeperHTTPReadinessHandler>(keeper_dispatcher); };
auto readiness_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<KeeperHTTPReadinessHandler>>(std::move(readiness_creator));
readiness_handler->attachStrictPath(config.getString("keeper_server.http_control.readiness.endpoint", "/ready"));
readiness_handler->allowGetAndHeadRequest();
factory.addPathToHints("/ready");
factory.addHandler(readiness_handler);
addReadinessHandlerToFactory(factory, keeper_dispatcher, config);
addDashboardHandlersToFactory(factory, server, keeper_dispatcher);
addCommandsHandlersToFactory(factory, server, keeper_dispatcher);
addStorageHandlersToFactory(factory, server, keeper_dispatcher);
@ -163,7 +160,8 @@ static inline auto createHandlersFactoryFromConfig(
"{}.{}.handler.type",
prefix,
key);
if (handler_type == "ready")
addReadinessHandlerToFactory(*main_handler_factory, keeper_dispatcher, config);
if (handler_type == "dashboard")
addDashboardHandlersToFactory(*main_handler_factory, server, keeper_dispatcher);
if (handler_type == "commands")

View File

@ -11,6 +11,7 @@
#include <Poco/Util/LayeredConfiguration.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadHelpers.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <memory>
@ -27,9 +28,9 @@ extern const int TIMEOUT_EXCEEDED;
Poco::JSON::Object toJSON(const Coordination::Stat & stat)
{
Poco::JSON::Object result;
result.set("cZxid", stat.czxid);
result.set("mZxid", stat.mzxid);
result.set("pZxid", stat.pzxid);
result.set("czxid", stat.czxid);
result.set("mzxid", stat.mzxid);
result.set("pzxid", stat.pzxid);
result.set("ctime", stat.ctime);
result.set("mtime", stat.mtime);
result.set("version", stat.version);
@ -53,7 +54,7 @@ std::optional<int32_t> getVersionFromRequest(const HTTPServerRequest & request)
try
{
return std::stoi(version_param->second);
return parse<int32_t>(version_param->second);
}
catch (...)
{
@ -136,27 +137,26 @@ Coordination::ResponsePtr KeeperHTTPStorageHandler::awaitKeeperResponse(std::sha
= [response_promise](const Coordination::ResponsePtr & zk_response) mutable { response_promise->set_value(zk_response); };
const auto session_id = keeper_dispatcher->getSessionID(session_timeout.totalMilliseconds());
keeper_dispatcher->registerSession(session_id, response_callback);
SCOPE_EXIT({ keeper_dispatcher->finishSession(session_id); });
try
if (request->isReadRequest())
{
if (!keeper_dispatcher->putRequest(std::move(request), session_id))
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id);
if (response_future.wait_for(std::chrono::milliseconds(operation_timeout.totalMilliseconds())) != std::future_status::ready)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Operation timeout ({} ms) exceeded.", operation_timeout.totalMilliseconds());
auto result = response_future.get();
keeper_dispatcher->finishSession(session_id);
return result;
keeper_dispatcher->putLocalReadRequest(std::move(request), session_id);
}
catch (...)
else if (!keeper_dispatcher->putRequest(std::move(request), session_id))
{
// it is obligatory to finish the session, as it affects num_alive_connections metric
keeper_dispatcher->finishSession(session_id);
throw;
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id);
}
if (response_future.wait_for(std::chrono::milliseconds(operation_timeout.totalMilliseconds())) != std::future_status::ready)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Operation timeout ({} ms) exceeded.", operation_timeout.totalMilliseconds());
auto result = response_future.get();
keeper_dispatcher->finishSession(session_id);
return result;
}
void KeeperHTTPStorageHandler::performZooKeeperExistsRequest(const std::string & storage_path, HTTPServerResponse & response)

View File

@ -72,3 +72,28 @@ def test_http_readiness_partitioned_cluster(started_cluster):
assert readiness_data["status"] == "fail"
assert readiness_data["details"]["role"] == "follower"
assert readiness_data["details"]["hasLeader"] == False
def test_http_commands_basic_responses(started_cluster):
leader = keeper_utils.get_leader(cluster, [node1, node2, node3])
response = requests.get(
"http://{host}:{port}/api/v1/commands/conf".format(
host=leader.ip_address, port=9182
)
)
assert response.status_code == 200
command_data = response.json()
assert command_data["result"] == keeper_utils.send_4lw_cmd(cluster, leader, "conf")
follower = keeper_utils.get_any_follower(cluster, [node1, node2, node3])
response = requests.get(
"http://{host}:{port}/api/v1/commands/conf".format(
host=follower.ip_address, port=9182
)
)
assert response.status_code == 200
command_data = response.json()
assert command_data["result"] == keeper_utils.send_4lw_cmd(
cluster, follower, "conf"
)

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
<http_control>
<port>9182</port>
</http_control>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
<http_control>
<port>9182</port>
</http_control>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
<http_control>
<port>9182</port>
</http_control>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,121 @@
#!/usr/bin/env python3
import os
import pytest
import requests
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
import helpers.keeper_utils as keeper_utils
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
node1 = cluster.add_instance(
"node1", main_configs=["configs/enable_keeper1.xml"], stay_alive=True
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/enable_keeper2.xml"], stay_alive=True
)
node3 = cluster.add_instance(
"node3", main_configs=["configs/enable_keeper3.xml"], stay_alive=True
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def send_storage_request(
node, path, data=None, params=None, expected_response_code=200
):
response = requests.post(
"http://{host}:9182/api/v1/storage{path}".format(
host=node.ip_address, path=path
),
data=data,
params=params,
)
assert response.status_code == expected_response_code
return response
def test_keeper_http_storage_create_get_exists(started_cluster):
follower = keeper_utils.get_any_follower(cluster, [node1, node2, node3])
test_content = b"test_data"
send_storage_request(follower, "/create/test_storage_get", test_content)
send_storage_request(follower, "/exists/test_storage_get")
response = send_storage_request(follower, "/get/test_storage_get")
assert response.content == test_content
send_storage_request(
follower, "/get/test_storage_get/not_found", expected_response_code=404
)
send_storage_request(
follower, "/exists/test_storage_get/not_found", expected_response_code=404
)
def test_keeper_http_storage_set(started_cluster):
follower = keeper_utils.get_any_follower(cluster, [node1, node2, node3])
send_storage_request(follower, "/create/test_storage_set")
response = send_storage_request(follower, "/get/test_storage_set")
assert response.content == b""
test_content = b"test_content"
send_storage_request(
follower, "/set/test_storage_set", test_content, params={"version": 0}
)
response = send_storage_request(follower, "/get/test_storage_set")
assert response.content == test_content
# version is not set
send_storage_request(
follower, "/set/test_storage_set", test_content, expected_response_code=400
)
# node not found
send_storage_request(
follower,
"/set/test_storage_set/not_found",
test_content,
params={"version": 0},
expected_response_code=404,
)
def test_keeper_http_storage_list_remove(started_cluster):
follower = keeper_utils.get_any_follower(cluster, [node1, node2, node3])
send_storage_request(follower, "/create/test_storage_list")
send_storage_request(follower, "/create/test_storage_list/a")
send_storage_request(follower, "/create/test_storage_list/b")
send_storage_request(follower, "/create/test_storage_list/c")
response = send_storage_request(follower, "/list/test_storage_list")
assert sorted(response.json()["child_node_names"]) == ["a", "b", "c"]
send_storage_request(follower, "/remove/test_storage_list/b", params={"version": 0})
response = send_storage_request(follower, "/list/test_storage_list")
assert sorted(response.json()["child_node_names"]) == ["a", "c"]
# version is not set
send_storage_request(
follower, "/remove/test_storage_list/a", expected_response_code=400
)
response = send_storage_request(
follower, "/list/test_storage_list/not_found", expected_response_code=404
)