This commit is contained in:
speeedmaster 2024-09-19 09:05:44 +02:00 committed by GitHub
commit c678269d14
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 2675 additions and 150 deletions

View File

@ -38,7 +38,7 @@
#include <Server/HTTP/HTTPServer.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/KeeperReadinessHandler.h>
#include <Server/KeeperHTTPHandlerFactory.h>
#include <Server/PrometheusRequestHandlerFactory.h>
#include <Server/TCPServer.h>
@ -522,7 +522,7 @@ try
"Prometheus: http://" + address.toString(),
std::make_unique<HTTPServer>(
std::move(my_http_context),
createKeeperPrometheusHandlerFactory(*this, config_getter(), async_metrics, "PrometheusHandler-factory"),
createKeeperPrometheusHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"),
server_pool,
socket,
http_params));
@ -533,10 +533,6 @@ try
createServer(listen_host, port_name, listen_try, [&](UInt16 port) mutable
{
auto my_http_context = httpContext();
Poco::Timespan my_keep_alive_timeout(config.getUInt("keep_alive_timeout", 10), 0);
Poco::Net::HTTPServerParams::Ptr my_http_params = new Poco::Net::HTTPServerParams;
my_http_params->setTimeout(my_http_context->getReceiveTimeout());
my_http_params->setKeepAliveTimeout(my_keep_alive_timeout);
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
@ -547,8 +543,33 @@ try
port_name,
"HTTP Control: http://" + address.toString(),
std::make_unique<HTTPServer>(
std::move(my_http_context), createKeeperHTTPControlMainHandlerFactory(config_getter(), global_context->getKeeperDispatcher(), "KeeperHTTPControlHandler-factory"), server_pool, socket, http_params)
);
std::move(my_http_context),
createKeeperHTTPHandlerFactory(*this, config, global_context->getKeeperDispatcher(), "KeeperHTTPHandler-factory"),
server_pool,
socket,
http_params));
});
/// HTTPS control endpoints
port_name = "keeper_server.http_control.secure_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port) mutable
{
auto my_http_context = httpContext();
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(my_http_context->getReceiveTimeout());
socket.setSendTimeout(my_http_context->getSendTimeout());
servers->emplace_back(
listen_host,
port_name,
"HTTPS Control: https://" + address.toString(),
std::make_unique<HTTPServer>(
std::move(my_http_context),
createKeeperHTTPHandlerFactory(*this, config, global_context->getKeeperDispatcher(), "KeeperHTTPSHandler-factory"),
server_pool,
socket,
http_params));
});
}

File diff suppressed because it is too large Load Diff

View File

@ -103,7 +103,7 @@
#include <Server/ProxyV1HandlerFactory.h>
#include <Server/TLSHandlerFactory.h>
#include <Server/ProtocolServerAdapter.h>
#include <Server/KeeperReadinessHandler.h>
#include <Server/KeeperHTTPHandlerFactory.h>
#include <Server/HTTP/HTTPServer.h>
#include <Server/CloudPlacementInfo.h>
#include <Interpreters/AsynchronousInsertQueue.h>
@ -1885,10 +1885,11 @@ try
"HTTP Control: http://" + address.toString(),
std::make_unique<HTTPServer>(
std::move(http_context),
createKeeperHTTPControlMainHandlerFactory(
config_getter(),
global_context->getKeeperDispatcher(),
"KeeperHTTPControlHandler-factory"), server_pool, socket, http_params));
createKeeperHTTPHandlerFactory(
*this, config_getter(), global_context->getKeeperDispatcher(), "KeeperHTTPHandler-factory"),
server_pool,
socket,
http_params));
});
}
#else

View File

@ -37,7 +37,7 @@ public:
int32_t code();
static String toName(int32_t code);
static inline int32_t toCode(const String & name);
static int32_t toCode(const String & name);
protected:
KeeperDispatcher & keeper_dispatcher;

View File

@ -335,7 +335,7 @@ void KeeperDispatcher::responseThread()
try
{
setResponse(response_for_session.session_id, response_for_session.response, response_for_session.request);
setResponse(response_for_session.session_id, response_for_session.response, response_for_session.request);
}
catch (...)
{
@ -438,6 +438,34 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
return true;
}
bool KeeperDispatcher::putLocalReadRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
{
if (!request->isReadRequest())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot put non-read request locally");
}
{
/// If session was already disconnected than we will ignore requests
std::lock_guard lock(session_to_response_callback_mutex);
if (!session_to_response_callback.contains(session_id))
return false;
}
KeeperStorageBase::RequestForSession request_info;
request_info.request = request;
using namespace std::chrono;
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
request_info.session_id = session_id;
if (keeper_context->isShutdownCalled())
return false;
server->putLocalReadRequest(request_info);
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequests);
return true;
}
void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper, bool start_async, const MultiVersion<Macros>::Version & macros)
{
LOG_DEBUG(log, "Initializing storage dispatcher");

View File

@ -142,6 +142,9 @@ public:
/// Put request to ClickHouse Keeper
bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
/// Put local read request to ClickHouse Keeper
bool putLocalReadRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
/// Get new session ID
int64_t getSessionID(int64_t session_timeout_ms);

View File

@ -173,4 +173,17 @@ void HTTPServerRequest::readRequest(ReadBuffer & in)
setVersion(version);
}
std::string HTTPServerRequest::toStringForLogging() const
{
return fmt::format(
"Method: {}, Address: {}, User-Agent: {}{}, Content Type: {}, Transfer Encoding: {}, X-Forwarded-For: {}",
getMethod(),
clientAddress().toString(),
get("User-Agent", "(none)"),
(hasContentLength() ? fmt::format(", Length: {}", getContentLength()) : ("")),
getContentType(),
getTransferEncoding(),
get("X-Forwarded-For", "(none)"));
}
}

View File

@ -48,6 +48,8 @@ public:
Poco::Net::X509Certificate peerCertificate() const;
#endif
std::string toStringForLogging() const;
private:
/// Limits for basic sanity checks when reading a header
enum Limits

View File

@ -85,6 +85,17 @@ public:
});
}
/// Handle GET, HEAD or POST endpoint on specified path
void allowGetHeadAndPostRequest()
{
addFilter([](const auto & request)
{
return request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST;
});
}
/// Handle Post request or (Get or Head) with params or OPTIONS requests
void allowPostAndGetParamsAndOptionsRequest()
{

View File

@ -13,10 +13,7 @@ HTTPRequestHandlerFactoryMain::HTTPRequestHandlerFactoryMain(const std::string &
std::unique_ptr<HTTPRequestHandler> HTTPRequestHandlerFactoryMain::createRequestHandler(const HTTPServerRequest & request)
{
LOG_TRACE(log, "HTTP Request for {}. Method: {}, Address: {}, User-Agent: {}{}, Content Type: {}, Transfer Encoding: {}, X-Forwarded-For: {}",
name, request.getMethod(), request.clientAddress().toString(), request.get("User-Agent", "(none)"),
(request.hasContentLength() ? (", Length: " + std::to_string(request.getContentLength())) : ("")),
request.getContentType(), request.getTransferEncoding(), request.get("X-Forwarded-For", "(none)"));
LOG_TRACE(log, "HTTP Request for {}. {}", name, request.toStringForLogging());
for (auto & handler_factory : child_factories)
{

View File

@ -0,0 +1,104 @@
#include <Server/KeeperDashboardRequestHandler.h>
#if USE_NURAFT
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
#include <IO/HTTPCommon.h>
#include <IO/Operators.h>
#include <Interpreters/Context.h>
#include <Common/config_version.h>
#include <Common/re2.h>
#include <incbin.h>
/// Embedded HTML pages
INCBIN(resource_keeper_dashboard_html, SOURCE_DIR "/programs/keeper/dashboard.html");
namespace DB
{
void KeeperDashboardWebUIRequestHandler::handleRequest(
HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event &)
{
std::string html(reinterpret_cast<const char *>(gresource_keeper_dashboard_htmlData), gresource_keeper_dashboard_htmlSize);
/// Replace a link to external JavaScript file to embedded file.
/// This allows to open the HTML without running a server and to host it on server.
/// Note: we can embed the JavaScript file inline to the HTML,
/// but we don't do it to keep the "view-source" perfectly readable.
static re2::RE2 uplot_url = R"(https://[^\s"'`]+u[Pp]lot[^\s"'`]*\.js)";
RE2::Replace(&html, uplot_url, "/js/uplot.js");
static re2::RE2 lz_string_url = R"(https://[^\s"'`]+lz-string[^\s"'`]*\.js)";
RE2::Replace(&html, lz_string_url, "/js/lz-string.js");
response.setContentType("text/html; charset=UTF-8");
if (request.getVersion() == HTTPServerRequest::HTTP_1_1)
response.setChunkedTransferEncoding(true);
setResponseDefaultHeaders(response);
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_OK);
WriteBufferFromHTTPServerResponse(response, request.getMethod() == HTTPRequest::HTTP_HEAD)
.write(html.data(), html.size());
}
void KeeperDashboardContentRequestHandler::handleRequest(
HTTPServerRequest & /*request*/, HTTPServerResponse & response, const ProfileEvents::Event &)
try
{
Poco::JSON::Object response_json;
response_json.set("ch_version", VERSION_DESCRIBE);
if (keeper_dispatcher->isServerActive())
{
Poco::JSON::Object keeper_details;
auto & stats = keeper_dispatcher->getKeeperConnectionStats();
Keeper4LWInfo keeper_info = keeper_dispatcher->getKeeper4LWInfo();
keeper_details.set("latency_avg", stats.getAvgLatency());
keeper_details.set("role", keeper_info.getRole());
keeper_details.set("alive_connections", toString(keeper_info.alive_connections_count));
keeper_details.set("node_count", toString(keeper_info.total_nodes_count));
response_json.set("keeper_details", keeper_details);
}
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(response_json, oss);
response.setContentType("application/json");
*response.send() << oss.str();
}
catch (...)
{
tryLogCurrentException("KeeperDashboardContentRequestHandler");
try
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
{
/// We have not sent anything yet and we don't even know if we need to compress response.
*response.send() << getCurrentExceptionMessage(false) << '\n';
}
}
catch (...)
{
LOG_ERROR(getLogger("KeeperDashboardContentRequestHandler"), "Cannot send exception to client");
}
}
}
#endif

View File

@ -0,0 +1,36 @@
#pragma once
#include "config.h"
#if USE_NURAFT
#include <Coordination/KeeperDispatcher.h>
#include <Server/HTTP/HTTPRequestHandler.h>
namespace DB
{
/// Response with HTML page that allows to send queries and show results in browser.
class KeeperDashboardWebUIRequestHandler : public HTTPRequestHandler
{
public:
explicit KeeperDashboardWebUIRequestHandler() = default;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
};
/// Response with json containing dashboard information to be displayed
class KeeperDashboardContentRequestHandler : public HTTPRequestHandler
{
private:
std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
public:
explicit KeeperDashboardContentRequestHandler(std::shared_ptr<KeeperDispatcher> keeper_dispatcher_)
: keeper_dispatcher(keeper_dispatcher_)
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
};
}
#endif

View File

@ -0,0 +1,355 @@
#include <Server/KeeperHTTPHandlerFactory.h>
#if USE_NURAFT
#include <memory>
#include <Coordination/FourLetterCommand.h>
#include <Coordination/KeeperDispatcher.h>
#include <IO/HTTPCommon.h>
#include <IO/Operators.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/HTTPHandlerRequestFilter.h>
#include <Server/IServer.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Server/KeeperDashboardRequestHandler.h>
#include <Server/KeeperHTTPStorageHandler.h>
#include <Server/KeeperNotFoundHandler.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
KeeperHTTPRequestHandlerFactory::KeeperHTTPRequestHandlerFactory(const std::string & name_) : log(getLogger(name_)), name(name_)
{
}
std::unique_ptr<HTTPRequestHandler> KeeperHTTPRequestHandlerFactory::createRequestHandler(const HTTPServerRequest & request)
{
LOG_TRACE(log, "HTTP Request for {}. {}", name, request.toStringForLogging());
for (auto & handler_factory : child_factories)
{
auto handler = handler_factory->createRequestHandler(request);
if (handler)
return handler;
}
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET || request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
{
return std::unique_ptr<HTTPRequestHandler>(new KeeperNotFoundHandler(hints.getHints(request.getURI())));
}
return nullptr;
}
void addDashboardHandlersToFactory(
KeeperHTTPRequestHandlerFactory & factory, std::shared_ptr<KeeperDispatcher> keeper_dispatcher)
{
auto dashboard_ui_creator = []() -> std::unique_ptr<KeeperDashboardWebUIRequestHandler>
{ return std::make_unique<KeeperDashboardWebUIRequestHandler>(); };
auto dashboard_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<KeeperDashboardWebUIRequestHandler>>(dashboard_ui_creator);
dashboard_handler->attachStrictPath("/dashboard");
dashboard_handler->allowGetAndHeadRequest();
factory.addPathToHints("/dashboard");
factory.addHandler(dashboard_handler);
auto dashboard_content_creator = [keeper_dispatcher]() -> std::unique_ptr<KeeperDashboardContentRequestHandler>
{ return std::make_unique<KeeperDashboardContentRequestHandler>(keeper_dispatcher); };
auto dashboard_content_handler
= std::make_shared<HandlingRuleHTTPHandlerFactory<KeeperDashboardContentRequestHandler>>(dashboard_content_creator);
dashboard_content_handler->attachStrictPath("/dashboard/content");
dashboard_content_handler->allowGetAndHeadRequest();
factory.addHandler(dashboard_content_handler);
}
void addReadinessHandlerToFactory(
KeeperHTTPRequestHandlerFactory & factory,
std::shared_ptr<KeeperDispatcher> keeper_dispatcher,
const Poco::Util::AbstractConfiguration & config)
{
auto creator = [keeper_dispatcher]() -> std::unique_ptr<KeeperHTTPReadinessHandler>
{ return std::make_unique<KeeperHTTPReadinessHandler>(keeper_dispatcher); };
auto readiness_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<KeeperHTTPReadinessHandler>>(std::move(creator));
readiness_handler->attachStrictPath(config.getString("keeper_server.http_control.readiness.endpoint", "/ready"));
readiness_handler->allowGetAndHeadRequest();
factory.addPathToHints("/ready");
factory.addHandler(readiness_handler);
}
void addCommandsHandlersToFactory(KeeperHTTPRequestHandlerFactory & factory, std::shared_ptr<KeeperDispatcher> keeper_dispatcher)
{
auto creator = [keeper_dispatcher]() -> std::unique_ptr<KeeperHTTPCommandsHandler>
{ return std::make_unique<KeeperHTTPCommandsHandler>(keeper_dispatcher); };
auto commads_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<KeeperHTTPCommandsHandler>>(std::move(creator));
commads_handler->attachNonStrictPath("/api/v1/commands");
commads_handler->allowGetHeadAndPostRequest();
factory.addPathToHints("/api/v1/commands");
factory.addHandler(commads_handler);
}
void addStorageHandlersToFactory(
KeeperHTTPRequestHandlerFactory & factory, const IServer & server, std::shared_ptr<KeeperDispatcher> keeper_dispatcher)
{
auto creator = [&server, keeper_dispatcher]() -> std::unique_ptr<KeeperHTTPStorageHandler>
{ return std::make_unique<KeeperHTTPStorageHandler>(server, keeper_dispatcher); };
auto storage_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<KeeperHTTPStorageHandler>>(std::move(creator));
storage_handler->attachNonStrictPath("/api/v1/storage");
storage_handler->allowGetHeadAndPostRequest();
factory.addPathToHints("/api/v1/storage");
factory.addHandler(storage_handler);
}
void addDefaultHandlersToFactory(
KeeperHTTPRequestHandlerFactory & factory,
const IServer & server,
std::shared_ptr<KeeperDispatcher> keeper_dispatcher,
const Poco::Util::AbstractConfiguration & config)
{
addReadinessHandlerToFactory(factory, keeper_dispatcher, config);
addDashboardHandlersToFactory(factory, keeper_dispatcher);
addCommandsHandlersToFactory(factory, keeper_dispatcher);
addStorageHandlersToFactory(factory, server, keeper_dispatcher);
}
static inline auto createHandlersFactoryFromConfig(
const IServer & server,
std::shared_ptr<KeeperDispatcher> keeper_dispatcher,
const Poco::Util::AbstractConfiguration & config,
const std::string & name,
const String & prefix)
{
auto main_handler_factory = std::make_shared<KeeperHTTPRequestHandlerFactory>(name);
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(prefix, keys);
for (const auto & key : keys)
{
if (key == "defaults")
{
addDefaultHandlersToFactory(*main_handler_factory, server, keeper_dispatcher, config);
}
else if (startsWith(key, "rule"))
{
const auto & handler_type = config.getString(prefix + "." + key + ".handler.type", "");
if (handler_type.empty())
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"Handler type in config is not specified here: "
"{}.{}.handler.type",
prefix,
key);
else if (handler_type == "ready")
addReadinessHandlerToFactory(*main_handler_factory, keeper_dispatcher, config);
else if (handler_type == "dashboard")
addDashboardHandlersToFactory(*main_handler_factory, keeper_dispatcher);
else if (handler_type == "commands")
addCommandsHandlersToFactory(*main_handler_factory, keeper_dispatcher);
else if (handler_type == "storage")
addStorageHandlersToFactory(*main_handler_factory, server, keeper_dispatcher);
else
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"Unknown handler type '{}' in config here: {}.{}.handler.type",
handler_type,
prefix,
key);
}
else
throw Exception(
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
"Unknown element in config: "
"{}.{}, must be 'rule' or 'defaults'",
prefix,
key);
}
return main_handler_factory;
}
KeeperHTTPReadinessHandler::KeeperHTTPReadinessHandler(std::shared_ptr<KeeperDispatcher> keeper_dispatcher_)
: log(getLogger("KeeperHTTPReadinessHandler")), keeper_dispatcher(keeper_dispatcher_)
{
}
void KeeperHTTPReadinessHandler::handleRequest(
HTTPServerRequest & /*request*/, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{
auto is_leader = keeper_dispatcher->isLeader();
auto is_follower = keeper_dispatcher->isFollower() && keeper_dispatcher->hasLeader();
auto is_observer = keeper_dispatcher->isObserver() && keeper_dispatcher->hasLeader();
auto data = keeper_dispatcher->getKeeper4LWInfo();
auto status = is_leader || is_follower || is_observer;
Poco::JSON::Object json, details;
details.set("role", data.getRole());
details.set("hasLeader", keeper_dispatcher->hasLeader());
json.set("details", details);
json.set("status", status ? "ok" : "fail");
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
if (!status)
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
*response.send() << oss.str();
}
catch (...)
{
tryLogCurrentException(log);
try
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
{
/// We have not sent anything yet and we don't even know if we need to compress response.
*response.send() << getCurrentExceptionMessage(false) << '\n';
}
}
catch (...)
{
LOG_ERROR(log, "Cannot send exception to client");
}
}
}
KeeperHTTPCommandsHandler::KeeperHTTPCommandsHandler(std::shared_ptr<KeeperDispatcher> keeper_dispatcher_)
: log(getLogger("KeeperHTTPCommandsHandler")), keeper_dispatcher(keeper_dispatcher_)
{
}
void KeeperHTTPCommandsHandler::handleRequest(
HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
try
{
std::vector<std::string> uri_segments;
try
{
Poco::URI uri(request.getURI());
uri.getPathSegments(uri_segments);
}
catch (...)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_BAD_REQUEST, "Could not parse request path.");
*response.send() << "Could not parse request path.\n";
return;
}
/// non-strict path "/api/v1/commands" filter is already attached
if (uri_segments.size() != 4)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_BAD_REQUEST, "Invalid command path");
*response.send() << "Invalid command path\n";
return;
}
const auto command = uri_segments[3];
setResponseDefaultHeaders(response);
Poco::JSON::Object response_json;
response.setContentType("application/json");
if (!FourLetterCommandFactory::instance().isKnown(DB::IFourLetterCommand::toCode(command)))
{
LOG_INFO(log, "Invalid four letter command: {}", command);
response_json.set("message", "Invalid four letter command.");
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_BAD_REQUEST);
}
else if (!FourLetterCommandFactory::instance().isEnabled(DB::IFourLetterCommand::toCode(command)))
{
LOG_INFO(log, "Not enabled four letter command: {}", command);
response_json.set("message", "Command is disabled. Check server settings.");
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_FORBIDDEN);
}
else
{
auto command_ptr = FourLetterCommandFactory::instance().get(DB::IFourLetterCommand::toCode(command));
LOG_DEBUG(log, "Received four letter command {}", command_ptr->name());
try
{
String res = command_ptr->run();
response_json.set("result", res);
response.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
}
catch (...)
{
tryLogCurrentException(log, "Error when executing four letter command " + command_ptr->name());
response_json.set("message", "Internal server error.");
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
}
}
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(response_json, oss);
*response.send() << oss.str();
}
catch (...)
{
tryLogCurrentException(log);
try
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
{
/// We have not sent anything yet and we don't even know if we need to compress response.
*response.send() << getCurrentExceptionMessage(false) << '\n';
}
}
catch (...)
{
LOG_ERROR(log, "Cannot send exception to client");
}
}
HTTPRequestHandlerFactoryPtr createKeeperHTTPHandlerFactory(
const IServer & server,
const Poco::Util::AbstractConfiguration & config,
std::shared_ptr<KeeperDispatcher> keeper_dispatcher,
const std::string & name)
{
if (config.has("keeper_server.http_control.handlers"))
return createHandlersFactoryFromConfig(server, keeper_dispatcher, config, name, "keeper_server.http_control.handlers");
auto factory = std::make_shared<KeeperHTTPRequestHandlerFactory>(name);
addDefaultHandlersToFactory(*factory, server, keeper_dispatcher, config);
return factory;
}
}
#endif

View File

@ -0,0 +1,69 @@
#pragma once
#include <cstdint>
#include <config.h>
#if USE_NURAFT
#include <Coordination/KeeperDispatcher.h>
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include <Server/HTTPPathHints.h>
namespace DB
{
class IServer;
/// Handle request using child handlers
class KeeperHTTPRequestHandlerFactory : public HTTPRequestHandlerFactory
{
public:
explicit KeeperHTTPRequestHandlerFactory(const std::string & name_);
void addHandler(HTTPRequestHandlerFactoryPtr child_factory) { child_factories.emplace_back(child_factory); }
void addPathToHints(const std::string & http_path) { hints.add(http_path); }
std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) override;
private:
LoggerPtr log;
std::string name;
HTTPPathHints hints;
std::vector<HTTPRequestHandlerFactoryPtr> child_factories;
};
class KeeperHTTPReadinessHandler : public HTTPRequestHandler, WithContext
{
private:
LoggerPtr log;
std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
public:
explicit KeeperHTTPReadinessHandler(std::shared_ptr<KeeperDispatcher> keeper_dispatcher_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
};
class KeeperHTTPCommandsHandler : public HTTPRequestHandler
{
private:
LoggerPtr log;
std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
public:
explicit KeeperHTTPCommandsHandler(std::shared_ptr<KeeperDispatcher> keeper_dispatcher_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
};
HTTPRequestHandlerFactoryPtr createKeeperHTTPHandlerFactory(
const IServer & server,
const Poco::Util::AbstractConfiguration & config,
std::shared_ptr<KeeperDispatcher> keeper_dispatcher,
const std::string & name);
}
#endif

View File

@ -0,0 +1,413 @@
#include <Server/KeeperHTTPStorageHandler.h>
#if USE_NURAFT
#include "IServer.h"
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <IO/HTTPCommon.h>
#include <IO/Operators.h>
#include <IO/ReadHelpers.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <memory>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
}
Poco::JSON::Object toJSON(const Coordination::Stat & stat)
{
Poco::JSON::Object result;
result.set("czxid", stat.czxid);
result.set("mzxid", stat.mzxid);
result.set("pzxid", stat.pzxid);
result.set("ctime", stat.ctime);
result.set("mtime", stat.mtime);
result.set("version", stat.version);
result.set("cversion", stat.cversion);
result.set("aversion", stat.aversion);
result.set("ephemeralOwner", stat.ephemeralOwner);
result.set("dataLength", stat.dataLength);
result.set("numChildren", stat.numChildren);
return result;
}
std::optional<int32_t> getVersionFromRequest(const HTTPServerRequest & request)
{
/// we store version argument as a "version" query parameter
Poco::URI uri(request.getURI());
const auto query_params = uri.getQueryParameters();
const auto version_param
= std::find_if(query_params.begin(), query_params.begin(), [](const auto & param) { return param.first == "version"; });
if (version_param == query_params.end())
return std::nullopt;
try
{
return parse<int32_t>(version_param->second);
}
catch (...)
{
return std::nullopt;
}
}
std::string getRawBytesFromRequest(HTTPServerRequest & request)
{
std::string request_data;
char ch = 0;
while (request.getStream().read(ch))
request_data += ch;
return request_data;
}
bool setErrorResponseForZKCode(Coordination::Error error, HTTPServerResponse & response)
{
switch (error)
{
case Coordination::Error::ZOK:
return false;
case Coordination::Error::ZNONODE:
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_NOT_FOUND, "Node not found.");
*response.send() << "Requested node not found.\n";
return true;
case Coordination::Error::ZBADVERSION:
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_CONFLICT, "Version conflict.");
*response.send() << "Version conflict. Check the current version and try again.\n";
return true;
default:
response.setStatus(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
*response.send() << "Keeper request finished with error: " << errorMessage(error) << ".\n";
return true;
}
}
KeeperHTTPStorageHandler::KeeperHTTPStorageHandler(const IServer & server_, std::shared_ptr<KeeperDispatcher> keeper_dispatcher_)
: log(getLogger("KeeperHTTPStorageHandler"))
, server(server_)
, keeper_dispatcher(keeper_dispatcher_)
, session_timeout(
server.config().getUInt("keeper_server.http_control.storage.session_timeout", Coordination::DEFAULT_SESSION_TIMEOUT_MS)
* Poco::Timespan::MILLISECONDS)
, operation_timeout(
server.config().getUInt("keeper_server.http_control.storage.operation_timeout", Coordination::DEFAULT_OPERATION_TIMEOUT_MS)
* Poco::Timespan::MILLISECONDS)
{
}
void KeeperHTTPStorageHandler::performZooKeeperRequest(
Coordination::OpNum opnum, const std::string & storage_path, HTTPServerRequest & request, HTTPServerResponse & response)
{
switch (opnum)
{
case Coordination::OpNum::Exists:
performZooKeeperExistsRequest(storage_path, response);
return;
case Coordination::OpNum::List:
performZooKeeperListRequest(storage_path, response);
return;
case Coordination::OpNum::Get:
performZooKeeperGetRequest(storage_path, response);
return;
case Coordination::OpNum::Set:
performZooKeeperSetRequest(storage_path, request, response);
return;
case Coordination::OpNum::Create:
performZooKeeperCreateRequest(storage_path, request, response);
return;
case Coordination::OpNum::Remove:
performZooKeeperRemoveRequest(storage_path, request, response);
return;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to perform ZK request for unsupported OpNum. It's a bug.");
}
}
Coordination::ZooKeeperResponsePtr KeeperHTTPStorageHandler::awaitKeeperResponse(std::shared_ptr<Coordination::ZooKeeperRequest> request)
{
auto response_promise = std::make_shared<std::promise<Coordination::ZooKeeperResponsePtr>>();
auto response_future = response_promise->get_future();
auto response_callback
= [response_promise](const Coordination::ZooKeeperResponsePtr & zk_response, Coordination::ZooKeeperRequestPtr) mutable
{ response_promise->set_value(zk_response); };
const auto session_id = keeper_dispatcher->getSessionID(session_timeout.totalMilliseconds());
keeper_dispatcher->registerSession(session_id, response_callback);
SCOPE_EXIT({ keeper_dispatcher->finishSession(session_id); });
if (request->isReadRequest())
{
keeper_dispatcher->putLocalReadRequest(request, session_id);
}
else if (!keeper_dispatcher->putRequest(request, session_id))
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id);
}
if (response_future.wait_for(std::chrono::milliseconds(operation_timeout.totalMilliseconds())) != std::future_status::ready)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Operation timeout ({} ms) exceeded.", operation_timeout.totalMilliseconds());
auto result = response_future.get();
keeper_dispatcher->finishSession(session_id);
return result;
}
void KeeperHTTPStorageHandler::performZooKeeperExistsRequest(const std::string & storage_path, HTTPServerResponse & response)
{
Coordination::ZooKeeperExistsRequest zk_request;
zk_request.path = storage_path;
const auto result_ptr = awaitKeeperResponse(std::make_shared<Coordination::ZooKeeperExistsRequest>(std::move(zk_request)));
auto exists_result_ptr = std::dynamic_pointer_cast<Coordination::ZooKeeperExistsResponse>(result_ptr);
if (!exists_result_ptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected result type for get operation.");
if (setErrorResponseForZKCode(exists_result_ptr->error, response))
return;
Poco::JSON::Object response_json;
response_json.set("stat", toJSON(exists_result_ptr->stat));
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(response_json, oss);
response.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
response.setContentType("application/json");
*response.send() << oss.str();
}
void KeeperHTTPStorageHandler::performZooKeeperListRequest(const std::string & storage_path, HTTPServerResponse & response)
{
Coordination::ZooKeeperListRequest zk_request;
zk_request.path = storage_path;
const auto result_ptr = awaitKeeperResponse(std::make_shared<Coordination::ZooKeeperListRequest>(std::move(zk_request)));
auto list_result_ptr = std::dynamic_pointer_cast<Coordination::ZooKeeperListResponse>(result_ptr);
if (!list_result_ptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected result type for list operation.");
if (setErrorResponseForZKCode(list_result_ptr->error, response))
return;
Poco::JSON::Object response_json;
response_json.set("child_node_names", list_result_ptr->names);
response_json.set("stat", toJSON(list_result_ptr->stat));
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(response_json, oss);
response.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
response.setContentType("application/json");
*response.send() << oss.str();
}
void KeeperHTTPStorageHandler::performZooKeeperGetRequest(const std::string & storage_path, HTTPServerResponse & response)
{
Coordination::ZooKeeperGetRequest zk_request;
zk_request.path = storage_path;
const auto result_ptr = awaitKeeperResponse(std::make_shared<Coordination::ZooKeeperGetRequest>(std::move(zk_request)));
auto get_result_ptr = std::dynamic_pointer_cast<Coordination::ZooKeeperGetResponse>(result_ptr);
if (!get_result_ptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected result type for get operation.");
if (setErrorResponseForZKCode(get_result_ptr->error, response))
return;
response.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
response.setContentType("application/octet-stream");
response.setContentLength(get_result_ptr->data.size());
response.send()->write(get_result_ptr->data.c_str(), get_result_ptr->data.size());
response.send()->next();
}
void KeeperHTTPStorageHandler::performZooKeeperSetRequest(
const std::string & storage_path, HTTPServerRequest & request, HTTPServerResponse & response)
{
const auto maybe_request_version = getVersionFromRequest(request);
if (!maybe_request_version.has_value())
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_BAD_REQUEST, "Version parameter is not set or invalid.");
*response.send() << "Version parameter is not set or invalid for set request.\n";
return;
}
Coordination::ZooKeeperSetRequest zk_request;
zk_request.path = storage_path;
zk_request.data = getRawBytesFromRequest(request);
zk_request.version = maybe_request_version.value();
const auto result_ptr = awaitKeeperResponse(std::make_shared<Coordination::ZooKeeperSetRequest>(std::move(zk_request)));
auto set_result_ptr = std::dynamic_pointer_cast<Coordination::ZooKeeperSetResponse>(result_ptr);
if (!set_result_ptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected result type for set operation.");
if (setErrorResponseForZKCode(set_result_ptr->error, response))
return;
response.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
response.setContentType("text/plain");
*response.send() << "OK\n";
}
void KeeperHTTPStorageHandler::performZooKeeperCreateRequest(
const std::string & storage_path, HTTPServerRequest & request, HTTPServerResponse & response)
{
Coordination::ZooKeeperCreateRequest zk_request;
zk_request.path = storage_path;
zk_request.data = getRawBytesFromRequest(request);
const auto result_ptr = awaitKeeperResponse(std::make_shared<Coordination::ZooKeeperCreateRequest>(std::move(zk_request)));
auto create_result_ptr = std::dynamic_pointer_cast<Coordination::ZooKeeperCreateResponse>(result_ptr);
if (!create_result_ptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected result type for create operation.");
if (setErrorResponseForZKCode(create_result_ptr->error, response))
return;
response.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
response.setContentType("text/plain");
*response.send() << "OK\n";
}
void KeeperHTTPStorageHandler::performZooKeeperRemoveRequest(
const std::string & storage_path, HTTPServerRequest & request, HTTPServerResponse & response)
{
const auto maybe_request_version = getVersionFromRequest(request);
if (!maybe_request_version.has_value())
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_BAD_REQUEST, "Version parameter is not set or invalid.");
*response.send() << "Version parameter is not set or invalid for remove request.\n";
return;
}
Coordination::ZooKeeperRemoveRequest zk_request;
zk_request.path = storage_path;
zk_request.version = maybe_request_version.value();
const auto result_ptr = awaitKeeperResponse(std::make_shared<Coordination::ZooKeeperRemoveRequest>(std::move(zk_request)));
auto remove_result_ptr = std::dynamic_pointer_cast<Coordination::ZooKeeperRemoveResponse>(result_ptr);
if (!remove_result_ptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected result type for remove operation.");
if (setErrorResponseForZKCode(remove_result_ptr->error, response))
return;
response.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
response.setContentType("text/plain");
*response.send() << "OK\n";
}
void KeeperHTTPStorageHandler::handleRequest(
HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
try
{
static const auto uri_segments_prefix_length = 3; /// /api/v1/storage
static const std::unordered_map<std::string, Coordination::OpNum> supported_storage_operations = {
{"exists", Coordination::OpNum::Exists},
{"list", Coordination::OpNum::List},
{"get", Coordination::OpNum::Get},
{"set", Coordination::OpNum::Set},
{"create", Coordination::OpNum::Create},
{"remove", Coordination::OpNum::Remove},
};
std::vector<std::string> uri_segments;
try
{
Poco::URI uri(request.getURI());
uri.getPathSegments(uri_segments);
}
catch (...)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_BAD_REQUEST, "Could not parse request path.");
*response.send() << "Could not parse request path. Check if special symbols are used.\n";
return;
}
// non-strict path "/api/v1/storage" filter is already attached
if (uri_segments.size() <= uri_segments_prefix_length)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_BAD_REQUEST, "Invalid storage request path.");
*response.send() << "Invalid storage request path.\n";
return;
}
const auto & operation_name = uri_segments[uri_segments_prefix_length];
if (!supported_storage_operations.contains(operation_name))
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_BAD_REQUEST, "Storage operation is not supported.");
*response.send() << "Storage operation is not supported.\n";
return;
}
const auto opnum = supported_storage_operations.at(operation_name);
std::string storage_path;
for (size_t i = uri_segments_prefix_length + 1; i < uri_segments.size(); ++i)
storage_path += ("/" + uri_segments[i]);
if (storage_path.empty())
storage_path = "/";
setResponseDefaultHeaders(response);
if (keeper_dispatcher->isServerActive())
{
try
{
performZooKeeperRequest(opnum, storage_path, request, response);
return;
}
catch (...)
{
tryLogCurrentException(log, "Error when executing Keeper storage operation: " + operation_name);
response.setStatus(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
*response.send() << getCurrentExceptionMessage(false) << '\n';
}
}
else
{
LOG_WARNING(log, "Ignoring user request, because the server is not active yet");
response.setStatus(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
*response.send() << "Service Unavailable.\n";
}
}
catch (...)
{
tryLogCurrentException(log);
try
{
response.setStatus(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
{
/// We have not sent anything yet and we don't even know if we need to compress response.
*response.send() << getCurrentExceptionMessage(false) << '\n';
}
}
catch (...)
{
LOG_ERROR(log, "Cannot send exception to client");
}
}
}
#endif

View File

@ -0,0 +1,46 @@
#pragma once
#include <cstdint>
#include "config.h"
#if USE_NURAFT
#include <Coordination/KeeperDispatcher.h>
#include <Server/HTTP/HTTPRequestHandler.h>
namespace DB
{
class IServer;
/// Response with the storage info or perform an action on a given node from request
class KeeperHTTPStorageHandler : public HTTPRequestHandler
{
private:
LoggerPtr log;
const IServer & server;
std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
Poco::Timespan session_timeout;
Poco::Timespan operation_timeout;
public:
KeeperHTTPStorageHandler(const IServer & server_, std::shared_ptr<KeeperDispatcher> keeper_dispatcher_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
Coordination::ZooKeeperResponsePtr awaitKeeperResponse(std::shared_ptr<Coordination::ZooKeeperRequest> request);
void performZooKeeperRequest(
Coordination::OpNum opnum, const std::string & storage_path, HTTPServerRequest & request, HTTPServerResponse & response);
void performZooKeeperExistsRequest(const std::string & storage_path, HTTPServerResponse & response);
void performZooKeeperListRequest(const std::string & storage_path, HTTPServerResponse & response);
void performZooKeeperGetRequest(const std::string & storage_path, HTTPServerResponse & response);
void performZooKeeperSetRequest(const std::string & storage_path, HTTPServerRequest & request, HTTPServerResponse & response);
void performZooKeeperCreateRequest(const std::string & storage_path, HTTPServerRequest & request, HTTPServerResponse & response);
void performZooKeeperRemoveRequest(const std::string & storage_path, HTTPServerRequest & request, HTTPServerResponse & response);
};
}
#endif

View File

@ -0,0 +1,27 @@
#include <Server/KeeperNotFoundHandler.h>
#include <IO/HTTPCommon.h>
#include <IO/Operators.h>
#include <Common/Exception.h>
namespace DB
{
void KeeperNotFoundHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_NOT_FOUND);
*response.send() << "There is no handle " << request.getURI()
<< (!hints.empty() ? fmt::format(". Maybe you meant {}.", hints.front()) : "") << "\n\n"
<< "Use /ready for health checks.\n"
<< "Or /api/v1/commands for more sophisticated health checks and monitoring.\n\n"
<< "Use Web UI monitoring panel at /dashboard.\n\n"
<< "Access Keeper storage directly with /api/v1/storage POST or GET methods.\n";
}
catch (...)
{
tryLogCurrentException("KeeperNotFoundHandler");
}
}
}

View File

@ -0,0 +1,18 @@
#pragma once
#include <Server/HTTP/HTTPRequestHandler.h>
namespace DB
{
/// Response with 404 and verbose description.
class KeeperNotFoundHandler : public HTTPRequestHandler
{
public:
explicit KeeperNotFoundHandler(std::vector<std::string> hints_) : hints(std::move(hints_)) {}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
std::vector<std::string> hints;
};
}

View File

@ -1,95 +0,0 @@
#include <Server/KeeperReadinessHandler.h>
#if USE_NURAFT
#include <memory>
#include <IO/HTTPCommon.h>
#include <IO/Operators.h>
#include <Coordination/KeeperDispatcher.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/HTTPHandlerRequestFilter.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
namespace DB
{
void KeeperReadinessHandler::handleRequest(HTTPServerRequest & /*request*/, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{
auto is_leader = keeper_dispatcher->isLeader();
auto is_follower = keeper_dispatcher->isFollower() && keeper_dispatcher->hasLeader();
auto is_observer = keeper_dispatcher->isObserver() && keeper_dispatcher->hasLeader();
auto data = keeper_dispatcher->getKeeper4LWInfo();
auto status = is_leader || is_follower || is_observer;
Poco::JSON::Object json, details;
details.set("role", data.getRole());
details.set("hasLeader", keeper_dispatcher->hasLeader());
json.set("details", details);
json.set("status", status ? "ok" : "fail");
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
if (!status)
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
*response.send() << oss.str();
}
catch (...)
{
tryLogCurrentException("KeeperReadinessHandler");
try
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
{
/// We have not sent anything yet and we don't even know if we need to compress response.
*response.send() << getCurrentExceptionMessage(false) << '\n';
}
}
catch (...)
{
LOG_ERROR((getLogger("KeeperReadinessHandler")), "Cannot send exception to client");
}
}
}
HTTPRequestHandlerFactoryPtr createKeeperHTTPControlMainHandlerFactory(
const Poco::Util::AbstractConfiguration & config,
std::shared_ptr<KeeperDispatcher> keeper_dispatcher,
const std::string & name)
{
auto factory = std::make_shared<HTTPRequestHandlerFactoryMain>(name);
using Factory = HandlingRuleHTTPHandlerFactory<KeeperReadinessHandler>;
Factory::Creator creator = [keeper_dispatcher]() -> std::unique_ptr<KeeperReadinessHandler>
{
return std::make_unique<KeeperReadinessHandler>(keeper_dispatcher);
};
auto readiness_handler = std::make_shared<Factory>(std::move(creator));
readiness_handler->attachStrictPath(config.getString("keeper_server.http_control.readiness.endpoint", "/ready"));
readiness_handler->allowGetAndHeadRequest();
factory->addHandler(readiness_handler);
return factory;
}
}
#endif

View File

@ -1,36 +0,0 @@
#pragma once
#include <config.h>
#if USE_NURAFT
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include <Coordination/KeeperDispatcher.h>
namespace DB
{
class KeeperReadinessHandler : public HTTPRequestHandler, WithContext
{
private:
std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
public:
explicit KeeperReadinessHandler(std::shared_ptr<KeeperDispatcher> keeper_dispatcher_)
: keeper_dispatcher(keeper_dispatcher_)
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
};
HTTPRequestHandlerFactoryPtr
createKeeperHTTPControlMainHandlerFactory(
const Poco::Util::AbstractConfiguration & config,
std::shared_ptr<KeeperDispatcher> keeper_dispatcher,
const std::string & name);
}
#endif

View File

@ -72,3 +72,29 @@ def test_http_readiness_partitioned_cluster(started_cluster):
assert readiness_data["status"] == "fail"
assert readiness_data["details"]["role"] == "follower"
assert readiness_data["details"]["hasLeader"] == False
def test_http_commands_basic_responses(started_cluster):
leader = keeper_utils.get_leader(cluster, [node1, node2, node3])
response = requests.get(
"http://{host}:{port}/api/v1/commands/conf".format(
host=leader.ip_address, port=9182
)
)
assert response.status_code == 200
command_data = response.json()
assert command_data["result"] == keeper_utils.send_4lw_cmd(cluster, leader, "conf")
follower = keeper_utils.get_any_follower(cluster, [node1, node2, node3])
response = requests.get(
"http://{host}:{port}/api/v1/commands/conf".format(
host=follower.ip_address, port=9182
)
)
assert response.status_code == 200
command_data = response.json()
assert command_data["result"] == keeper_utils.send_4lw_cmd(
cluster, follower, "conf"
)

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
<http_control>
<port>9182</port>
</http_control>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
<http_control>
<port>9182</port>
</http_control>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
<http_control>
<port>9182</port>
</http_control>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,121 @@
#!/usr/bin/env python3
import os
import pytest
import requests
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
import helpers.keeper_utils as keeper_utils
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
node1 = cluster.add_instance(
"node1", main_configs=["configs/enable_keeper1.xml"], stay_alive=True
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/enable_keeper2.xml"], stay_alive=True
)
node3 = cluster.add_instance(
"node3", main_configs=["configs/enable_keeper3.xml"], stay_alive=True
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def send_storage_request(
node, path, data=None, params=None, expected_response_code=200
):
response = requests.post(
"http://{host}:9182/api/v1/storage{path}".format(
host=node.ip_address, path=path
),
data=data,
params=params,
)
assert response.status_code == expected_response_code
return response
def test_keeper_http_storage_create_get_exists(started_cluster):
follower = keeper_utils.get_any_follower(cluster, [node1, node2, node3])
test_content = b"test_data"
send_storage_request(follower, "/create/test_storage_get", test_content)
send_storage_request(follower, "/exists/test_storage_get")
response = send_storage_request(follower, "/get/test_storage_get")
assert response.content == test_content
send_storage_request(
follower, "/get/test_storage_get/not_found", expected_response_code=404
)
send_storage_request(
follower, "/exists/test_storage_get/not_found", expected_response_code=404
)
def test_keeper_http_storage_set(started_cluster):
follower = keeper_utils.get_any_follower(cluster, [node1, node2, node3])
send_storage_request(follower, "/create/test_storage_set")
response = send_storage_request(follower, "/get/test_storage_set")
assert response.content == b""
test_content = b"test_content"
send_storage_request(
follower, "/set/test_storage_set", test_content, params={"version": 0}
)
response = send_storage_request(follower, "/get/test_storage_set")
assert response.content == test_content
# version is not set
send_storage_request(
follower, "/set/test_storage_set", test_content, expected_response_code=400
)
# node not found
send_storage_request(
follower,
"/set/test_storage_set/not_found",
test_content,
params={"version": 0},
expected_response_code=404,
)
def test_keeper_http_storage_list_remove(started_cluster):
follower = keeper_utils.get_any_follower(cluster, [node1, node2, node3])
send_storage_request(follower, "/create/test_storage_list")
send_storage_request(follower, "/create/test_storage_list/a")
send_storage_request(follower, "/create/test_storage_list/b")
send_storage_request(follower, "/create/test_storage_list/c")
response = send_storage_request(follower, "/list/test_storage_list")
assert sorted(response.json()["child_node_names"]) == ["a", "b", "c"]
send_storage_request(follower, "/remove/test_storage_list/b", params={"version": 0})
response = send_storage_request(follower, "/list/test_storage_list")
assert sorted(response.json()["child_node_names"]) == ["a", "c"]
# version is not set
send_storage_request(
follower, "/remove/test_storage_list/a", expected_response_code=400
)
response = send_storage_request(
follower, "/list/test_storage_list/not_found", expected_response_code=404
)