Merge pull request #10547 from ClickHouse/zhang2014/feature/ISSUES-5436

Merging #7572
This commit is contained in:
tavplubix 2020-04-28 12:40:48 +03:00 committed by GitHub
commit 11c3493676
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1094 additions and 324 deletions

View File

@ -4,14 +4,13 @@ set(CLICKHOUSE_SERVER_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/InterserverIOHTTPHandler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/InterserverIOHTTPHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/MetricsTransmitter.cpp ${CMAKE_CURRENT_SOURCE_DIR}/MetricsTransmitter.cpp
${CMAKE_CURRENT_SOURCE_DIR}/NotFoundHandler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/NotFoundHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/PingRequestHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/PrometheusMetricsWriter.cpp ${CMAKE_CURRENT_SOURCE_DIR}/PrometheusMetricsWriter.cpp
${CMAKE_CURRENT_SOURCE_DIR}/PrometheusRequestHandler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/PrometheusRequestHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ReplicasStatusHandler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ReplicasStatusHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/RootRequestHandler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/StaticRequestHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/Server.cpp ${CMAKE_CURRENT_SOURCE_DIR}/Server.cpp
${CMAKE_CURRENT_SOURCE_DIR}/TCPHandler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/TCPHandler.cpp
) )
set(CLICKHOUSE_SERVER_SOURCES set(CLICKHOUSE_SERVER_SOURCES
${CLICKHOUSE_SERVER_SOURCES} ${CLICKHOUSE_SERVER_SOURCES}

View File

@ -1,5 +1,8 @@
#include "HTTPHandler.h" #include "HTTPHandler.h"
#include "HTTPHandlerFactory.h"
#include "HTTPHandlerRequestFilter.h"
#include <chrono> #include <chrono>
#include <iomanip> #include <iomanip>
#include <Poco/File.h> #include <Poco/File.h>
@ -7,6 +10,7 @@
#include <Poco/Net/HTTPServerRequest.h> #include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerRequestImpl.h> #include <Poco/Net/HTTPServerRequestImpl.h>
#include <Poco/Net/HTTPServerResponse.h> #include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include <Poco/Net/NetException.h> #include <Poco/Net/NetException.h>
#include <ext/scope_guard.h> #include <ext/scope_guard.h>
#include <Core/ExternalTable.h> #include <Core/ExternalTable.h>
@ -32,6 +36,7 @@
#include <IO/WriteBufferFromTemporaryFile.h> #include <IO/WriteBufferFromTemporaryFile.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <Interpreters/executeQuery.h> #include <Interpreters/executeQuery.h>
#include <Interpreters/QueryParameterVisitor.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Poco/Net/HTTPStream.h> #include <Poco/Net/HTTPStream.h>
@ -54,6 +59,7 @@ namespace ErrorCodes
extern const int CANNOT_PARSE_DATETIME; extern const int CANNOT_PARSE_DATETIME;
extern const int CANNOT_PARSE_NUMBER; extern const int CANNOT_PARSE_NUMBER;
extern const int CANNOT_OPEN_FILE; extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_COMPILE_REGEXP;
extern const int UNKNOWN_ELEMENT_IN_AST; extern const int UNKNOWN_ELEMENT_IN_AST;
extern const int UNKNOWN_TYPE_OF_AST_NODE; extern const int UNKNOWN_TYPE_OF_AST_NODE;
@ -78,6 +84,7 @@ namespace ErrorCodes
extern const int UNKNOWN_FORMAT; extern const int UNKNOWN_FORMAT;
extern const int UNKNOWN_DATABASE_ENGINE; extern const int UNKNOWN_DATABASE_ENGINE;
extern const int UNKNOWN_TYPE_OF_QUERY; extern const int UNKNOWN_TYPE_OF_QUERY;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int QUERY_IS_TOO_LARGE; extern const int QUERY_IS_TOO_LARGE;
@ -204,9 +211,9 @@ void HTTPHandler::pushDelayedResults(Output & used_output)
} }
HTTPHandler::HTTPHandler(IServer & server_) HTTPHandler::HTTPHandler(IServer & server_, const std::string & name)
: server(server_) : server(server_)
, log(&Logger::get("HTTPHandler")) , log(&Logger::get(name))
{ {
server_display_name = server.config().getString("display_name", getFQDNOrHostName()); server_display_name = server.config().getString("display_name", getFQDNOrHostName());
} }
@ -226,13 +233,6 @@ void HTTPHandler::processQuery(
std::istream & istr = request.stream(); std::istream & istr = request.stream();
/// Part of the query can be passed in the 'query' parameter and the rest in the request body
/// (http method need not necessarily be POST). In this case the entire query consists of the
/// contents of the 'query' parameter, a line break and the request body.
std::string query_param = params.get("query", "");
if (!query_param.empty())
query_param += '\n';
/// The user and password can be passed by headers (similar to X-Auth-*), /// The user and password can be passed by headers (similar to X-Auth-*),
/// which is used by load balancers to pass authentication information. /// which is used by load balancers to pass authentication information.
std::string user = request.get("X-ClickHouse-User", ""); std::string user = request.get("X-ClickHouse-User", "");
@ -390,8 +390,6 @@ void HTTPHandler::processQuery(
used_output.out_maybe_delayed_and_compressed = used_output.out_maybe_compressed; used_output.out_maybe_delayed_and_compressed = used_output.out_maybe_compressed;
} }
std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query_param);
std::unique_ptr<ReadBuffer> in_post_raw = std::make_unique<ReadBufferFromIStream>(istr); std::unique_ptr<ReadBuffer> in_post_raw = std::make_unique<ReadBufferFromIStream>(istr);
/// Request body can be compressed using algorithm specified in the Content-Encoding header. /// Request body can be compressed using algorithm specified in the Content-Encoding header.
@ -413,7 +411,7 @@ void HTTPHandler::processQuery(
std::unique_ptr<ReadBuffer> in; std::unique_ptr<ReadBuffer> in;
static const NameSet reserved_param_names{"query", "compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace",
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check"}; "buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check"};
Names reserved_param_suffixes; Names reserved_param_suffixes;
@ -478,16 +476,11 @@ void HTTPHandler::processQuery(
else if (param_could_be_skipped(key)) else if (param_could_be_skipped(key))
{ {
} }
else if (startsWith(key, "param_"))
{
/// Save name and values of substitution in dictionary.
const String parameter_name = key.substr(strlen("param_"));
context.setQueryParameter(parameter_name, value);
}
else else
{ {
/// All other query parameters are treated as settings. /// Other than query parameters are treated as settings.
settings_changes.push_back({key, value}); if (!customizeQueryParam(context, key, value))
settings_changes.push_back({key, value});
} }
} }
@ -495,25 +488,9 @@ void HTTPHandler::processQuery(
context.checkSettingsConstraints(settings_changes); context.checkSettingsConstraints(settings_changes);
context.applySettingsChanges(settings_changes); context.applySettingsChanges(settings_changes);
/// Used in case of POST request with form-data, but it isn't expected to be deleted after that scope. const auto & query = getQuery(request, params, context);
std::string full_query; std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query);
in = has_external_data ? std::move(in_param) : std::make_unique<ConcatReadBuffer>(*in_param, *in_post_maybe_compressed);
/// Support for "external data for query processing".
if (has_external_data)
{
ExternalTablesHandler handler(context, params);
params.load(request, istr, handler);
/// Params are of both form params POST and uri (GET params)
for (const auto & it : params)
if (it.first == "query")
full_query += it.second;
in = std::make_unique<ReadBufferFromString>(full_query);
}
else
in = std::make_unique<ConcatReadBuffer>(*in_param, *in_post_maybe_compressed);
/// HTTP response compression is turned on only if the client signalled that they support it /// HTTP response compression is turned on only if the client signalled that they support it
/// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on. /// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on.
@ -593,7 +570,7 @@ void HTTPHandler::processQuery(
}); });
} }
customizeContext(context); customizeContext(request, context);
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context, executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & current_query_id, const String & content_type, const String & format, const String & timezone) [&response] (const String & current_query_id, const String & content_type, const String & format, const String & timezone)
@ -731,5 +708,191 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
} }
} }
DynamicQueryHandler::DynamicQueryHandler(IServer & server_, const std::string & param_name_)
: HTTPHandler(server_, "DynamicQueryHandler"), param_name(param_name_)
{
}
bool DynamicQueryHandler::customizeQueryParam(Context & context, const std::string & key, const std::string & value)
{
if (key == param_name)
return true; /// do nothing
if (startsWith(key, "param_"))
{
/// Save name and values of substitution in dictionary.
const String parameter_name = key.substr(strlen("param_"));
context.setQueryParameter(parameter_name, value);
return true;
}
return false;
}
std::string DynamicQueryHandler::getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context)
{
if (likely(!startsWith(request.getContentType(), "multipart/form-data")))
{
/// Part of the query can be passed in the 'query' parameter and the rest in the request body
/// (http method need not necessarily be POST). In this case the entire query consists of the
/// contents of the 'query' parameter, a line break and the request body.
std::string query_param = params.get(param_name, "");
return query_param.empty() ? query_param : query_param + "\n";
}
/// Support for "external data for query processing".
/// Used in case of POST request with form-data, but it isn't expected to be deleted after that scope.
ExternalTablesHandler handler(context, params);
params.load(request, request.stream(), handler);
std::string full_query;
/// Params are of both form params POST and uri (GET params)
for (const auto & it : params)
if (it.first == param_name)
full_query += it.second;
return full_query;
}
PredefinedQueryHandler::PredefinedQueryHandler(
IServer & server, const NameSet & receive_params_, const std::string & predefined_query_
, const CompiledRegexPtr & url_regex_, const std::unordered_map<String, CompiledRegexPtr> & header_name_with_regex_)
: HTTPHandler(server, "PredefinedQueryHandler"), receive_params(receive_params_), predefined_query(predefined_query_)
, url_regex(url_regex_), header_name_with_capture_regex(header_name_with_regex_)
{
}
bool PredefinedQueryHandler::customizeQueryParam(Context & context, const std::string & key, const std::string & value)
{
if (receive_params.count(key))
{
context.setQueryParameter(key, value);
return true;
}
return false;
}
void PredefinedQueryHandler::customizeContext(Poco::Net::HTTPServerRequest & request, DB::Context & context)
{
/// If in the configuration file, the handler's header is regex and contains named capture group
/// We will extract regex named capture groups as query parameters
const auto & set_query_params = [&](const char * begin, const char * end, const CompiledRegexPtr & compiled_regex)
{
int num_captures = compiled_regex->NumberOfCapturingGroups() + 1;
re2::StringPiece matches[num_captures];
re2::StringPiece input(begin, end - begin);
if (compiled_regex->Match(input, 0, end - begin, re2::RE2::Anchor::ANCHOR_BOTH, matches, num_captures))
{
for (const auto & [capturing_name, capturing_index] : compiled_regex->NamedCapturingGroups())
{
const auto & capturing_value = matches[capturing_index];
if (capturing_value.data())
context.setQueryParameter(capturing_name, String(capturing_value.data(), capturing_value.size()));
}
}
};
if (url_regex)
{
const auto & uri = request.getURI();
set_query_params(uri.data(), find_first_symbols<'?'>(uri.data(), uri.data() + uri.size()), url_regex);
}
for (const auto & [header_name, regex] : header_name_with_capture_regex)
{
const auto & header_value = request.get(header_name);
set_query_params(header_value.data(), header_value.data() + header_value.size(), regex);
}
}
std::string PredefinedQueryHandler::getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context)
{
if (unlikely(startsWith(request.getContentType(), "multipart/form-data")))
{
/// Support for "external data for query processing".
ExternalTablesHandler handler(context, params);
params.load(request, request.stream(), handler);
}
return predefined_query;
}
Poco::Net::HTTPRequestHandlerFactory * createDynamicHandlerFactory(IServer & server, const std::string & config_prefix)
{
std::string query_param_name = server.config().getString(config_prefix + ".handler.query_param_name", "query");
return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory<DynamicQueryHandler>(server, std::move(query_param_name)), server.config(), config_prefix);
}
static inline bool capturingNamedQueryParam(NameSet receive_params, const CompiledRegexPtr & compiled_regex)
{
const auto & capturing_names = compiled_regex->NamedCapturingGroups();
return std::count_if(capturing_names.begin(), capturing_names.end(), [&](const auto & iterator)
{
return std::count_if(receive_params.begin(), receive_params.end(),
[&](const auto & param_name) { return param_name == iterator.first; });
});
}
static inline CompiledRegexPtr getCompiledRegex(const std::string & expression)
{
auto compiled_regex = std::make_shared<const re2::RE2>(expression);
if (!compiled_regex->ok())
throw Exception("Cannot compile re2: " + expression + " for http handling rule, error: " +
compiled_regex->error() + ". Look at https://github.com/google/re2/wiki/Syntax for reference.", ErrorCodes::CANNOT_COMPILE_REGEXP);
return compiled_regex;
}
Poco::Net::HTTPRequestHandlerFactory * createPredefinedHandlerFactory(IServer & server, const std::string & config_prefix)
{
Poco::Util::AbstractConfiguration & configuration = server.config();
if (!configuration.has(config_prefix + ".handler.query"))
throw Exception("There is no path '" + config_prefix + ".handler.query" + "' in configuration file.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
std::string predefined_query = configuration.getString(config_prefix + ".handler.query");
NameSet analyze_receive_params = analyzeReceiveQueryParams(predefined_query);
std::unordered_map<String, CompiledRegexPtr> headers_name_with_regex;
Poco::Util::AbstractConfiguration::Keys headers_name;
configuration.keys(config_prefix + ".headers", headers_name);
for (const auto & header_name : headers_name)
{
auto expression = configuration.getString(config_prefix + ".headers." + header_name);
if (!startsWith(expression, "regex:"))
continue;
expression = expression.substr(6);
auto regex = getCompiledRegex(expression);
if (capturingNamedQueryParam(analyze_receive_params, regex))
headers_name_with_regex.emplace(std::make_pair(header_name, regex));
}
if (configuration.has(config_prefix + ".url"))
{
auto url_expression = configuration.getString(config_prefix + ".url");
if (startsWith(url_expression, "regex:"))
url_expression = url_expression.substr(6);
auto regex = getCompiledRegex(url_expression);
if (capturingNamedQueryParam(analyze_receive_params, regex))
return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory<PredefinedQueryHandler>(
server, std::move(analyze_receive_params), std::move(predefined_query), std::move(regex),
std::move(headers_name_with_regex)), configuration, config_prefix);
}
return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory<PredefinedQueryHandler>(
server, std::move(analyze_receive_params), std::move(predefined_query), CompiledRegexPtr{} ,std::move(headers_name_with_regex)),
configuration, config_prefix);
}
} }

View File

@ -7,6 +7,7 @@
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/HTMLForm.h> #include <Common/HTMLForm.h>
#include <re2/re2.h>
namespace CurrentMetrics namespace CurrentMetrics
{ {
@ -20,16 +21,21 @@ namespace DB
class WriteBufferFromHTTPServerResponse; class WriteBufferFromHTTPServerResponse;
typedef std::shared_ptr<const re2::RE2> CompiledRegexPtr;
class HTTPHandler : public Poco::Net::HTTPRequestHandler class HTTPHandler : public Poco::Net::HTTPRequestHandler
{ {
public: public:
explicit HTTPHandler(IServer & server_); explicit HTTPHandler(IServer & server_, const std::string & name);
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
/// This method is called right before the query execution. /// This method is called right before the query execution.
virtual void customizeContext(DB::Context& /* context */) {} virtual void customizeContext(Poco::Net::HTTPServerRequest & /*request*/, Context & /* context */) {}
virtual bool customizeQueryParam(Context & context, const std::string & key, const std::string & value) = 0;
virtual std::string getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context) = 0;
private: private:
struct Output struct Output
@ -80,4 +86,35 @@ private:
static void pushDelayedResults(Output & used_output); static void pushDelayedResults(Output & used_output);
}; };
class DynamicQueryHandler : public HTTPHandler
{
private:
std::string param_name;
public:
explicit DynamicQueryHandler(IServer & server_, const std::string & param_name_ = "query");
std::string getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context) override;
bool customizeQueryParam(Context &context, const std::string &key, const std::string &value) override;
};
class PredefinedQueryHandler : public HTTPHandler
{
private:
NameSet receive_params;
std::string predefined_query;
CompiledRegexPtr url_regex;
std::unordered_map<String, CompiledRegexPtr> header_name_with_capture_regex;
public:
explicit PredefinedQueryHandler(
IServer & server, const NameSet & receive_params_, const std::string & predefined_query_
, const CompiledRegexPtr & url_regex_, const std::unordered_map<String, CompiledRegexPtr> & header_name_with_regex_);
virtual void customizeContext(Poco::Net::HTTPServerRequest & request, Context & context) override;
std::string getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context) override;
bool customizeQueryParam(Context & context, const std::string & key, const std::string & value) override;
};
} }

View File

@ -1,29 +1,44 @@
#include "HTTPHandlerFactory.h" #include "HTTPHandlerFactory.h"
#include <re2/re2.h>
#include <re2/stringpiece.h>
#include <common/find_symbols.h>
#include <Poco/StringTokenizer.h>
#include "HTTPHandler.h"
#include "NotFoundHandler.h"
#include "StaticRequestHandler.h"
#include "ReplicasStatusHandler.h"
#include "InterserverIOHTTPHandler.h"
#include "PrometheusRequestHandler.h"
namespace DB namespace DB
{ {
HTTPRequestHandlerFactoryMain::HTTPRequestHandlerFactoryMain(IServer & server_, const std::string & name_) namespace ErrorCodes
: server(server_), log(&Logger::get(name_)), name(name_) {
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int INVALID_CONFIG_PARAMETER;
}
HTTPRequestHandlerFactoryMain::HTTPRequestHandlerFactoryMain(const std::string & name_)
: log(&Logger::get(name_)), name(name_)
{ {
} }
Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHandler( Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHandler(const Poco::Net::HTTPServerRequest & request) // override
const Poco::Net::HTTPServerRequest & request) // override
{ {
LOG_TRACE(log, "HTTP Request for " << name << ". " LOG_TRACE(log, "HTTP Request for " << name << ". "
<< "Method: " << "Method: " << request.getMethod()
<< request.getMethod() << ", Address: " << request.clientAddress().toString()
<< ", Address: " << ", User-Agent: " << (request.has("User-Agent") ? request.get("User-Agent") : "none")
<< request.clientAddress().toString()
<< ", User-Agent: "
<< (request.has("User-Agent") ? request.get("User-Agent") : "none")
<< (request.hasContentLength() ? (", Length: " + std::to_string(request.getContentLength())) : ("")) << (request.hasContentLength() ? (", Length: " + std::to_string(request.getContentLength())) : (""))
<< ", Content Type: " << request.getContentType() << ", Content Type: " << request.getContentType()
<< ", Transfer Encoding: " << request.getTransferEncoding()); << ", Transfer Encoding: " << request.getTransferEncoding());
for (auto & handler_factory : child_handler_factories) for (auto & handler_factory : child_factories)
{ {
auto handler = handler_factory->createRequestHandler(request); auto handler = handler_factory->createRequestHandler(request);
if (handler != nullptr) if (handler != nullptr)
@ -40,4 +55,111 @@ Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHand
return nullptr; return nullptr;
} }
HTTPRequestHandlerFactoryMain::~HTTPRequestHandlerFactoryMain()
{
while (!child_factories.empty())
{
delete child_factories.back();
child_factories.pop_back();
}
}
HTTPRequestHandlerFactoryMain::TThis * HTTPRequestHandlerFactoryMain::addHandler(Poco::Net::HTTPRequestHandlerFactory * child_factory)
{
child_factories.emplace_back(child_factory);
return this;
}
static inline auto createHandlersFactoryFromConfig(IServer & server, const std::string & name, const String & prefix)
{
auto main_handler_factory = new HTTPRequestHandlerFactoryMain(name);
try
{
Poco::Util::AbstractConfiguration::Keys keys;
server.config().keys(prefix, keys);
for (const auto & key : keys)
{
if (!startsWith(key, "rule"))
throw Exception("Unknown element in config: " + prefix + "." + key + ", must be 'rule'", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
const auto & handler_type = server.config().getString(prefix + "." + key + ".handler.type", "");
if (handler_type == "static")
main_handler_factory->addHandler(createStaticHandlerFactory(server, prefix + "." + key));
else if (handler_type == "dynamic_query_handler")
main_handler_factory->addHandler(createDynamicHandlerFactory(server, prefix + "." + key));
else if (handler_type == "predefined_query_handler")
main_handler_factory->addHandler(createPredefinedHandlerFactory(server, prefix + "." + key));
else if (handler_type.empty())
throw Exception("Handler type in config is not specified here: " +
prefix + "." + key + ".handler.type", ErrorCodes::INVALID_CONFIG_PARAMETER);
else
throw Exception("Unknown handler type '" + handler_type +"' in config here: " +
prefix + "." + key + ".handler.type",ErrorCodes::INVALID_CONFIG_PARAMETER);
}
return main_handler_factory;
}
catch (...)
{
delete main_handler_factory;
throw;
}
}
static const auto ping_response_expression = "Ok.\n";
static const auto root_response_expression = "config://http_server_default_response";
static inline Poco::Net::HTTPRequestHandlerFactory * createHTTPHandlerFactory(IServer & server, const std::string & name, AsynchronousMetrics & async_metrics)
{
if (server.config().has("http_handlers"))
return createHandlersFactoryFromConfig(server, name, "http_handlers");
else
{
auto factory = (new HTTPRequestHandlerFactoryMain(name))
->addHandler((new HandlingRuleHTTPHandlerFactory<StaticRequestHandler>(server, root_response_expression))
->attachStrictPath("/")->allowGetAndHeadRequest())
->addHandler((new HandlingRuleHTTPHandlerFactory<StaticRequestHandler>(server, ping_response_expression))
->attachStrictPath("/ping")->allowGetAndHeadRequest())
->addHandler((new HandlingRuleHTTPHandlerFactory<ReplicasStatusHandler>(server))
->attachNonStrictPath("/replicas_status")->allowGetAndHeadRequest())
->addHandler((new HandlingRuleHTTPHandlerFactory<DynamicQueryHandler>(server, "query"))->allowPostAndGetParamsRequest());
if (server.config().has("prometheus") && server.config().getInt("prometheus.port", 0) == 0)
factory->addHandler((new HandlingRuleHTTPHandlerFactory<PrometheusRequestHandler>(
server, PrometheusMetricsWriter(server.config(), "prometheus", async_metrics)))
->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics"))->allowGetAndHeadRequest());
return factory;
}
}
static inline Poco::Net::HTTPRequestHandlerFactory * createInterserverHTTPHandlerFactory(IServer & server, const std::string & name)
{
return (new HTTPRequestHandlerFactoryMain(name))
->addHandler((new HandlingRuleHTTPHandlerFactory<StaticRequestHandler>(server, root_response_expression))
->attachStrictPath("/")->allowGetAndHeadRequest())
->addHandler((new HandlingRuleHTTPHandlerFactory<StaticRequestHandler>(server, ping_response_expression))
->attachStrictPath("/ping")->allowGetAndHeadRequest())
->addHandler((new HandlingRuleHTTPHandlerFactory<ReplicasStatusHandler>(server))
->attachNonStrictPath("/replicas_status")->allowGetAndHeadRequest())
->addHandler((new HandlingRuleHTTPHandlerFactory<InterserverIOHTTPHandler>(server))->allowPostAndGetParamsRequest());
}
Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & name)
{
if (name == "HTTPHandler-factory" || name == "HTTPSHandler-factory")
return createHTTPHandlerFactory(server, name, async_metrics);
else if (name == "InterserverIOHTTPHandler-factory" || name == "InterserverIOHTTPSHandler-factory")
return createInterserverHTTPHandlerFactory(server, name);
else if (name == "PrometheusHandler-factory")
return (new HTTPRequestHandlerFactoryMain(name))->addHandler((new HandlingRuleHTTPHandlerFactory<PrometheusRequestHandler>(
server, PrometheusMetricsWriter(server.config(), "prometheus", async_metrics)))
->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics"))->allowGetAndHeadRequest());
throw Exception("LOGICAL ERROR: Unknown HTTP handler factory name.", ErrorCodes::LOGICAL_ERROR);
}
} }

View File

@ -1,127 +1,114 @@
#pragma once #pragma once
#include <Poco/Net/HTTPRequestHandlerFactory.h> #include "IServer.h"
#include <common/logger_useful.h>
#include <Common/HTMLForm.h>
#include <Common/StringUtils/StringUtils.h>
#include <Poco/Net/HTTPServerRequest.h> #include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h> #include <Poco/Net/HTTPServerResponse.h>
#include <common/logger_useful.h> #include <Poco/Net/HTTPRequestHandlerFactory.h>
#include "IServer.h" #include <Interpreters/AsynchronousMetrics.h>
#include "HTTPHandler.h"
#include "InterserverIOHTTPHandler.h"
#include "NotFoundHandler.h"
#include "PingRequestHandler.h"
#include "PrometheusRequestHandler.h"
#include "ReplicasStatusHandler.h"
#include "RootRequestHandler.h"
namespace DB namespace DB
{ {
/// Handle request using child handlers /// Handle request using child handlers
class HTTPRequestHandlerFactoryMain : public Poco::Net::HTTPRequestHandlerFactory class HTTPRequestHandlerFactoryMain : public Poco::Net::HTTPRequestHandlerFactory, boost::noncopyable
{ {
private: private:
using TThis = HTTPRequestHandlerFactoryMain; using TThis = HTTPRequestHandlerFactoryMain;
IServer & server;
Logger * log; Logger * log;
std::string name; std::string name;
std::vector<std::unique_ptr<Poco::Net::HTTPRequestHandlerFactory>> child_handler_factories; std::vector<Poco::Net::HTTPRequestHandlerFactory *> child_factories;
public: public:
HTTPRequestHandlerFactoryMain(IServer & server_, const std::string & name_);
~HTTPRequestHandlerFactoryMain();
HTTPRequestHandlerFactoryMain(const std::string & name_);
TThis * addHandler(Poco::Net::HTTPRequestHandlerFactory * child_factory);
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override; Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override;
};
template <typename T, typename... TArgs> template <typename TEndpoint>
TThis * addHandler(TArgs &&... args) class HandlingRuleHTTPHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
{
public:
using TThis = HandlingRuleHTTPHandlerFactory<TEndpoint>;
using Filter = std::function<bool(const Poco::Net::HTTPServerRequest &)>;
template <typename... TArgs>
HandlingRuleHTTPHandlerFactory(TArgs &&... args)
{ {
child_handler_factories.emplace_back(std::make_unique<T>(server, std::forward<TArgs>(args)...)); creator = [args = std::tuple<TArgs...>(std::forward<TArgs>(args) ...)]()
{
return std::apply([&](auto && ... endpoint_args)
{
return new TEndpoint(std::forward<decltype(endpoint_args)>(endpoint_args)...);
}, std::move(args));
};
}
TThis * addFilter(Filter cur_filter)
{
Filter prev_filter = filter;
filter = [prev_filter, cur_filter](const auto & request)
{
return prev_filter ? prev_filter(request) && cur_filter(request) : cur_filter(request);
};
return this; return this;
} }
};
TThis * attachStrictPath(const String & strict_path)
{
return addFilter([strict_path](const auto & request) { return request.getURI() == strict_path; });
}
/// Handle POST or GET with params TThis * attachNonStrictPath(const String & non_strict_path)
template <typename HandleType> {
class HTTPQueryRequestHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory return addFilter([non_strict_path](const auto & request) { return startsWith(request.getURI(), non_strict_path); });
{ }
private:
IServer & server;
public: /// Handle GET or HEAD endpoint on specified path
HTTPQueryRequestHandlerFactory(IServer & server_) : server(server_) {} TThis * allowGetAndHeadRequest()
{
return addFilter([](const auto & request)
{
return request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD;
});
}
/// Handle POST or GET with params
TThis * allowPostAndGetParamsRequest()
{
return addFilter([](const auto & request)
{
return request.getURI().find('?') != std::string::npos
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST;
});
}
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override
{ {
if (request.getURI().find('?') != std::string::npos || request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST) return filter(request) ? creator() : nullptr;
return new HandleType(server);
return nullptr;
} }
};
/// Handle GET or HEAD endpoint on specified path
template <typename TGetEndpoint>
class HTTPGetRequestHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
{
private: private:
IServer & server; Filter filter;
public: std::function<Poco::Net::HTTPRequestHandler * ()> creator;
HTTPGetRequestHandlerFactory(IServer & server_) : server(server_) {}
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override
{
auto & method = request.getMethod();
if (!(method == Poco::Net::HTTPRequest::HTTP_GET || method == Poco::Net::HTTPRequest::HTTP_HEAD))
return nullptr;
auto & uri = request.getURI();
bool uri_match = TGetEndpoint::strict_path ? uri == TGetEndpoint::path : startsWith(uri, TGetEndpoint::path);
if (uri_match)
return new typename TGetEndpoint::HandleType(server);
return nullptr;
}
}; };
Poco::Net::HTTPRequestHandlerFactory * createStaticHandlerFactory(IServer & server, const std::string & config_prefix);
struct RootEndpoint Poco::Net::HTTPRequestHandlerFactory * createDynamicHandlerFactory(IServer & server, const std::string & config_prefix);
{
static constexpr auto path = "/";
static constexpr auto strict_path = true;
using HandleType = RootRequestHandler;
};
struct PingEndpoint Poco::Net::HTTPRequestHandlerFactory * createPredefinedHandlerFactory(IServer & server, const std::string & config_prefix);
{
static constexpr auto path = "/ping";
static constexpr auto strict_path = true;
using HandleType = PingRequestHandler;
};
struct ReplicasStatusEndpoint
{
static constexpr auto path = "/replicas_status";
static constexpr auto strict_path = false;
using HandleType = ReplicasStatusHandler;
};
using HTTPRootRequestHandlerFactory = HTTPGetRequestHandlerFactory<RootEndpoint>;
using HTTPPingRequestHandlerFactory = HTTPGetRequestHandlerFactory<PingEndpoint>;
using HTTPReplicasStatusRequestHandlerFactory = HTTPGetRequestHandlerFactory<ReplicasStatusEndpoint>;
template <typename HandleType>
HTTPRequestHandlerFactoryMain * createDefaultHandlerFatory(IServer & server, const std::string & name)
{
auto handlerFactory = new HTTPRequestHandlerFactoryMain(server, name);
handlerFactory->addHandler<HTTPRootRequestHandlerFactory>()
->addHandler<HTTPPingRequestHandlerFactory>()
->addHandler<HTTPReplicasStatusRequestHandlerFactory>()
->addHandler<HTTPQueryRequestHandlerFactory<HandleType>>();
return handlerFactory;
}
Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & name);
} }

View File

@ -0,0 +1,127 @@
#pragma once
#include "HTTPHandlerFactory.h"
#include <re2/re2.h>
#include <re2/stringpiece.h>
#include <Poco/StringTokenizer.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <common/find_symbols.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPILE_REGEXP;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
typedef std::shared_ptr<const re2::RE2> CompiledRegexPtr;
static inline bool checkRegexExpression(const StringRef & match_str, const CompiledRegexPtr & compiled_regex)
{
int num_captures = compiled_regex->NumberOfCapturingGroups() + 1;
re2::StringPiece matches[num_captures];
re2::StringPiece match_input(match_str.data, match_str.size);
return compiled_regex->Match(match_input, 0, match_str.size, re2::RE2::Anchor::ANCHOR_BOTH, matches, num_captures);
}
static inline bool checkExpression(const StringRef & match_str, const std::pair<String, CompiledRegexPtr> & expression)
{
if (expression.second)
return checkRegexExpression(match_str, expression.second);
return match_str == expression.first;
}
static inline auto methodsFilter(Poco::Util::AbstractConfiguration & config, const std::string & config_path)
{
std::vector<String> methods;
Poco::StringTokenizer tokenizer(config.getString(config_path), ",");
for (auto iterator = tokenizer.begin(); iterator != tokenizer.end(); ++iterator)
methods.emplace_back(Poco::toUpper(Poco::trim(*iterator)));
return [methods](const Poco::Net::HTTPServerRequest & request) { return std::count(methods.begin(), methods.end(), request.getMethod()); };
}
static inline auto getExpression(const std::string & expression)
{
if (!startsWith(expression, "regex:"))
return std::make_pair(expression, CompiledRegexPtr{});
auto compiled_regex = std::make_shared<const re2::RE2>(expression.substr(6));
if (!compiled_regex->ok())
throw Exception("cannot compile re2: " + expression + " for http handling rule, error: " + compiled_regex->error() +
". Look at https://github.com/google/re2/wiki/Syntax for reference.", ErrorCodes::CANNOT_COMPILE_REGEXP);
return std::make_pair(expression, compiled_regex);
}
static inline auto urlFilter(Poco::Util::AbstractConfiguration & config, const std::string & config_path)
{
return [expression = getExpression(config.getString(config_path))](const Poco::Net::HTTPServerRequest & request)
{
const auto & uri = request.getURI();
const auto & end = find_first_symbols<'?'>(uri.data(), uri.data() + uri.size());
return checkExpression(StringRef(uri.data(), end - uri.data()), expression);
};
}
static inline auto headersFilter(Poco::Util::AbstractConfiguration & config, const std::string & prefix)
{
std::unordered_map<String, std::pair<String, CompiledRegexPtr>> headers_expression;
Poco::Util::AbstractConfiguration::Keys headers_name;
config.keys(prefix, headers_name);
for (const auto & header_name : headers_name)
{
const auto & expression = getExpression(config.getString(prefix + "." + header_name));
checkExpression("", expression); /// Check expression syntax is correct
headers_expression.emplace(std::make_pair(header_name, expression));
}
return [headers_expression](const Poco::Net::HTTPServerRequest & request)
{
for (const auto & [header_name, header_expression] : headers_expression)
{
const auto & header_value = request.get(header_name, "");
if (!checkExpression(StringRef(header_value.data(), header_value.size()), header_expression))
return false;
}
return true;
};
}
template <typename TEndpoint>
static inline Poco::Net::HTTPRequestHandlerFactory * addFiltersFromConfig(
HandlingRuleHTTPHandlerFactory <TEndpoint> * factory, Poco::Util::AbstractConfiguration & config, const std::string & prefix)
{
Poco::Util::AbstractConfiguration::Keys filters_type;
config.keys(prefix, filters_type);
for (const auto & filter_type : filters_type)
{
if (filter_type == "handler")
continue;
else if (filter_type == "url")
factory->addFilter(urlFilter(config, prefix + ".url"));
else if (filter_type == "headers")
factory->addFilter(headersFilter(config, prefix + ".headers"));
else if (filter_type == "methods")
factory->addFilter(methodsFilter(config, prefix + ".methods"));
else
throw Exception("Unknown element in config: " + prefix + "." + filter_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
return factory;
}
}

View File

@ -34,4 +34,4 @@ void NotFoundHandler::handleRequest(
} }
} }
} }

View File

@ -15,4 +15,4 @@ public:
Poco::Net::HTTPServerResponse & response) override; Poco::Net::HTTPServerResponse & response) override;
}; };
} }

View File

@ -1,31 +0,0 @@
#include "PingRequestHandler.h"
#include <IO/HTTPCommon.h>
#include <Common/Exception.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
namespace DB
{
void PingRequestHandler::handleRequest(
Poco::Net::HTTPServerRequest &,
Poco::Net::HTTPServerResponse & response)
{
try
{
const auto & config = server.config();
setResponseDefaultHeaders(response, config.getUInt("keep_alive_timeout", 10));
const char * data = "Ok.\n";
response.sendBuffer(data, strlen(data));
}
catch (...)
{
tryLogCurrentException("PingRequestHandler");
}
}
}

View File

@ -1,27 +0,0 @@
#pragma once
#include "IServer.h"
#include <Poco/Net/HTTPRequestHandler.h>
namespace DB
{
/// Response with "Ok.\n". Used for availability checks.
class PingRequestHandler : public Poco::Net::HTTPRequestHandler
{
private:
IServer & server;
public:
explicit PingRequestHandler(IServer & server_) : server(server_)
{
}
void handleRequest(
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response) override;
};
}

View File

@ -18,7 +18,7 @@ private:
const PrometheusMetricsWriter & metrics_writer; const PrometheusMetricsWriter & metrics_writer;
public: public:
explicit PrometheusRequestHandler(IServer & server_, PrometheusMetricsWriter & metrics_writer_) explicit PrometheusRequestHandler(IServer & server_, const PrometheusMetricsWriter & metrics_writer_)
: server(server_) : server(server_)
, metrics_writer(metrics_writer_) , metrics_writer(metrics_writer_)
{ {
@ -29,33 +29,4 @@ public:
Poco::Net::HTTPServerResponse & response) override; Poco::Net::HTTPServerResponse & response) override;
}; };
template <typename HandlerType>
class PrometheusRequestHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
{
private:
IServer & server;
std::string endpoint_path;
PrometheusMetricsWriter metrics_writer;
public:
PrometheusRequestHandlerFactory(IServer & server_, const AsynchronousMetrics & async_metrics_)
: server(server_)
, endpoint_path(server_.config().getString("prometheus.endpoint", "/metrics"))
, metrics_writer(server_.config(), "prometheus", async_metrics_)
{
}
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override
{
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET
&& request.getURI() == endpoint_path)
return new HandlerType(server, metrics_writer);
return nullptr;
}
};
using PrometheusHandlerFactory = PrometheusRequestHandlerFactory<PrometheusRequestHandler>;
} }

View File

@ -17,7 +17,7 @@ private:
Context & context; Context & context;
public: public:
explicit ReplicasStatusHandler(IServer & server); explicit ReplicasStatusHandler(IServer & server_);
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
}; };

View File

@ -1,33 +0,0 @@
#include "RootRequestHandler.h"
#include <IO/HTTPCommon.h>
#include <Common/Exception.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
namespace DB
{
void RootRequestHandler::handleRequest(
Poco::Net::HTTPServerRequest &,
Poco::Net::HTTPServerResponse & response)
{
try
{
const auto & config = server.config();
setResponseDefaultHeaders(response, config.getUInt("keep_alive_timeout", 10));
response.setContentType("text/html; charset=UTF-8");
const std::string data = config.getString("http_server_default_response", "Ok.\n");
response.sendBuffer(data.data(), data.size());
}
catch (...)
{
tryLogCurrentException("RootRequestHandler");
}
}
}

View File

@ -1,27 +0,0 @@
#pragma once
#include "IServer.h"
#include <Poco/Net/HTTPRequestHandler.h>
namespace DB
{
/// Response with custom string. Can be used for browser.
class RootRequestHandler : public Poco::Net::HTTPRequestHandler
{
private:
IServer & server;
public:
explicit RootRequestHandler(IServer & server_) : server(server_)
{
}
void handleRequest(
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response) override;
};
}

View File

@ -806,15 +806,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, port); auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout); socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout); socket.setSendTimeout(settings.http_send_timeout);
auto handler_factory = createDefaultHandlerFatory<HTTPHandler>(*this, "HTTPHandler-factory");
if (config().has("prometheus") && config().getInt("prometheus.port", 0) == 0)
handler_factory->addHandler<PrometheusHandlerFactory>(async_metrics);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>( servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
handler_factory, createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params));
server_pool,
socket,
http_params));
LOG_INFO(log, "Listening for http://" + address.toString()); LOG_INFO(log, "Listening for http://" + address.toString());
}); });
@ -828,10 +822,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket.setReceiveTimeout(settings.http_receive_timeout); socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout); socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>( servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
createDefaultHandlerFatory<HTTPHandler>(*this, "HTTPSHandler-factory"), createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params));
server_pool,
socket,
http_params));
LOG_INFO(log, "Listening for https://" + address.toString()); LOG_INFO(log, "Listening for https://" + address.toString());
#else #else
@ -886,10 +877,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket.setReceiveTimeout(settings.http_receive_timeout); socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout); socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>( servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
createDefaultHandlerFatory<InterserverIOHTTPHandler>(*this, "InterserverIOHTTPHandler-factory"), createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params));
server_pool,
socket,
http_params));
LOG_INFO(log, "Listening for replica communication (interserver): http://" + address.toString()); LOG_INFO(log, "Listening for replica communication (interserver): http://" + address.toString());
}); });
@ -902,10 +890,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket.setReceiveTimeout(settings.http_receive_timeout); socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout); socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>( servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
createDefaultHandlerFatory<InterserverIOHTTPHandler>(*this, "InterserverIOHTTPHandler-factory"), createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params));
server_pool,
socket,
http_params));
LOG_INFO(log, "Listening for secure replica communication (interserver): https://" + address.toString()); LOG_INFO(log, "Listening for secure replica communication (interserver): https://" + address.toString());
#else #else
@ -937,13 +922,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, port); auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout); socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout); socket.setSendTimeout(settings.http_send_timeout);
auto handler_factory = new HTTPRequestHandlerFactoryMain(*this, "PrometheusHandler-factory");
handler_factory->addHandler<PrometheusHandlerFactory>(async_metrics);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>( servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
handler_factory, createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
server_pool,
socket,
http_params));
LOG_INFO(log, "Listening for Prometheus: http://" + address.toString()); LOG_INFO(log, "Listening for Prometheus: http://" + address.toString());
}); });

View File

@ -0,0 +1,168 @@
#include "StaticRequestHandler.h"
#include "HTTPHandlerFactory.h"
#include "HTTPHandlerRequestFilter.h"
#include <IO/HTTPCommon.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromString.h>
#include <IO/copyData.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <Common/Exception.h>
#include <Poco/Path.h>
#include <Poco/File.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_FILE_NAME;
extern const int HTTP_LENGTH_REQUIRED;
extern const int INVALID_CONFIG_PARAMETER;
}
static inline WriteBufferPtr responseWriteBuffer(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, unsigned int keep_alive_timeout)
{
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String http_response_compression_methods = request.get("Accept-Encoding", "");
CompressionMethod http_response_compression_method = CompressionMethod::None;
if (!http_response_compression_methods.empty())
{
/// If client supports brotli - it's preferred.
/// Both gzip and deflate are supported. If the client supports both, gzip is preferred.
/// NOTE parsing of the list of methods is slightly incorrect.
if (std::string::npos != http_response_compression_methods.find("br"))
http_response_compression_method = CompressionMethod::Brotli;
else if (std::string::npos != http_response_compression_methods.find("gzip"))
http_response_compression_method = CompressionMethod::Gzip;
else if (std::string::npos != http_response_compression_methods.find("deflate"))
http_response_compression_method = CompressionMethod::Zlib;
}
bool client_supports_http_compression = http_response_compression_method != CompressionMethod::None;
return std::make_shared<WriteBufferFromHTTPServerResponse>(
request, response, keep_alive_timeout, client_supports_http_compression, http_response_compression_method);
}
static inline void trySendExceptionToClient(
const std::string & s, int exception_code,
Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response , WriteBuffer & out)
{
try
{
response.set("X-ClickHouse-Exception-Code", toString<int>(exception_code));
/// If HTTP method is POST and Keep-Alive is turned on, we should read the whole request body
/// to avoid reading part of the current request body in the next request.
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST
&& response.getKeepAlive() && !request.stream().eof() && exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED)
request.stream().ignore(std::numeric_limits<std::streamsize>::max());
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << s << std::endl;
else
{
if (out.count() != out.offset())
out.position() = out.buffer().begin();
writeString(s, out);
writeChar('\n', out);
out.next();
out.finalize();
}
}
catch (...)
{
tryLogCurrentException("StaticRequestHandler", "Cannot send exception to client");
}
}
void StaticRequestHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
auto keep_alive_timeout = server.config().getUInt("keep_alive_timeout", 10);
const auto & out = responseWriteBuffer(request, response, keep_alive_timeout);
try
{
response.setContentType(content_type);
if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1)
response.setChunkedTransferEncoding(true);
/// Workaround. Poco does not detect 411 Length Required case.
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && !request.getChunkedTransferEncoding() && !request.hasContentLength())
throw Exception("The Transfer-Encoding is not chunked and there is no Content-Length header for POST request", ErrorCodes::HTTP_LENGTH_REQUIRED);
setResponseDefaultHeaders(response, keep_alive_timeout);
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTPStatus(status));
writeResponse(*out);
}
catch (...)
{
tryLogCurrentException("StaticRequestHandler");
int exception_code = getCurrentExceptionCode();
std::string exception_message = getCurrentExceptionMessage(false, true);
trySendExceptionToClient(exception_message, exception_code, request, response, *out);
}
}
void StaticRequestHandler::writeResponse(WriteBuffer & out)
{
static const String file_prefix = "file://";
static const String config_prefix = "config://";
if (startsWith(response_expression, file_prefix))
{
const auto & user_files_absolute_path = Poco::Path(server.context().getUserFilesPath()).makeAbsolute().makeDirectory().toString();
const auto & file_name = response_expression.substr(file_prefix.size(), response_expression.size() - file_prefix.size());
const auto & file_path = Poco::Path(user_files_absolute_path, file_name).makeAbsolute().toString();
if (!Poco::File(file_path).exists())
throw Exception("Invalid file name " + file_path + " for static HTTPHandler. ", ErrorCodes::INCORRECT_FILE_NAME);
ReadBufferFromFile in(file_path);
copyData(in, out);
}
else if (startsWith(response_expression, config_prefix))
{
if (response_expression.size() <= config_prefix.size())
throw Exception( "Static handling rule handler must contain a complete configuration path, for example: config://config_key",
ErrorCodes::INVALID_CONFIG_PARAMETER);
const auto & config_path = response_expression.substr(config_prefix.size(), response_expression.size() - config_prefix.size());
writeString(server.config().getRawString(config_path, "Ok.\n"), out);
}
else
writeString(response_expression, out);
}
StaticRequestHandler::StaticRequestHandler(IServer & server_, const String & expression, int status_, const String & content_type_)
: server(server_), status(status_), content_type(content_type_), response_expression(expression)
{
}
Poco::Net::HTTPRequestHandlerFactory * createStaticHandlerFactory(IServer & server, const std::string & config_prefix)
{
int status = server.config().getInt(config_prefix + ".handler.status", 200);
std::string response_content = server.config().getRawString(config_prefix + ".handler.response_content", "Ok.\n");
std::string response_content_type = server.config().getString(config_prefix + ".handler.content_type", "text/plain; charset=UTF-8");
return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory<StaticRequestHandler>(
server, std::move(response_content), std::move(status), std::move(response_content_type)), server.config(), config_prefix);
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include "IServer.h"
#include <Poco/Net/HTTPRequestHandler.h>
#include <Common/StringUtils/StringUtils.h>
namespace DB
{
/// Response with custom string. Can be used for browser.
class StaticRequestHandler : public Poco::Net::HTTPRequestHandler
{
private:
IServer & server;
int status;
String content_type;
String response_expression;
public:
StaticRequestHandler(IServer & server, const String & expression, int status_ = 200, const String & content_type_ = "text/html; charset=UTF-8");
void writeResponse(WriteBuffer & out);
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
};
}

View File

@ -535,7 +535,6 @@
--> -->
<format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path> <format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
<!-- Uncomment to use query masking rules. <!-- Uncomment to use query masking rules.
name - name for the rule (optional) name - name for the rule (optional)
regexp - RE2 compatible regular expression (mandatory) regexp - RE2 compatible regular expression (mandatory)
@ -549,6 +548,50 @@
</query_masking_rules> </query_masking_rules>
--> -->
<!-- Uncomment to use custom http handlers.
rules are checked from top to bottom, first match runs the handler
url - to match request URL, you can use 'regex:' prefix to use regex match(optional)
methods - to match request method, you can use commas to separate multiple method matches(optional)
headers - to match request headers, match each child element(child element name is header name), you can use 'regex:' prefix to use regex match(optional)
handler is request handler
type - supported types: static, dynamic_query_handler, predefined_query_handler
query - use with predefined_query_handler type, executes query when the handler is called
query_param_name - use with dynamic_query_handler type, extracts and executes the value corresponding to the <query_param_name> value in HTTP request params
status - use with static type, response status code
content_type - use with static type, response content-type
response_content - use with static type, Response content sent to client, when using the prefix 'file://' or 'config://', find the content from the file or configuration send to client.
<http_handlers>
<rule>
<url>/</url>
<methods>POST,GET</methods>
<headers><pragma>no-cache</pragma></headers>
<handler>
<type>dynamic_query_handler</type>
<query_param_name>query</query_param_name>
</handler>
</rule>
<rule>
<url>/predefined_query</url>
<methods>POST,GET</methods>
<handler>
<type>predefined_query_handler</type>
<query>SELECT * FROM system.settings</query>
</handler>
</rule>
<rule>
<handler>
<type>static</type>
<status>200</status>
<content_type>text/plain; charset=UTF-8</content_type>
<response_content>config://http_server_default_response</response_content>
</handler>
</rule>
</http_handlers>
-->
<!-- Uncomment to disable ClickHouse internal DNS caching. --> <!-- Uncomment to disable ClickHouse internal DNS caching. -->
<!-- <disable_internal_dns_cache>1</disable_internal_dns_cache> --> <!-- <disable_internal_dns_cache>1</disable_internal_dns_cache> -->
</yandex> </yandex>

View File

@ -18,11 +18,10 @@ SRCS(
MySQLHandler.cpp MySQLHandler.cpp
MySQLHandlerFactory.cpp MySQLHandlerFactory.cpp
NotFoundHandler.cpp NotFoundHandler.cpp
PingRequestHandler.cpp
PrometheusMetricsWriter.cpp PrometheusMetricsWriter.cpp
PrometheusRequestHandler.cpp PrometheusRequestHandler.cpp
ReplicasStatusHandler.cpp ReplicasStatusHandler.cpp
RootRequestHandler.cpp StaticRequestHandler.cpp
Server.cpp Server.cpp
TCPHandler.cpp TCPHandler.cpp
) )

View File

@ -0,0 +1,49 @@
#pragma once
#include <Core/Names.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryParameter.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/parseQuery.h>
namespace DB
{
class QueryParameterVisitor
{
public:
QueryParameterVisitor(NameSet & parameters_name) : query_parameters(parameters_name) {}
void visit(const ASTPtr & ast)
{
for (const auto & child : ast->children)
{
if (const auto & query_parameter = child->as<ASTQueryParameter>())
visitQueryParameter(*query_parameter);
else
visit(child);
}
}
private:
NameSet & query_parameters;
void visitQueryParameter(const ASTQueryParameter & query_parameter)
{
query_parameters.insert(query_parameter.name);
}
};
NameSet analyzeReceiveQueryParams(const std::string & query)
{
NameSet query_params;
const char * query_begin = query.data();
const char * query_end = query.data() + query.size();
ParserQuery parser(query_end, false);
ASTPtr extract_query_ast = parseQuery(parser, query_begin, query_end, "analyzeReceiveQueryParams", 0, 0);
QueryParameterVisitor(query_params).visit(extract_query_ast);
return query_params;
}
}

View File

@ -11,6 +11,7 @@ import subprocess
import time import time
import urllib import urllib
import httplib import httplib
import requests
import xml.dom.minidom import xml.dom.minidom
import logging import logging
import docker import docker
@ -689,7 +690,7 @@ class ClickHouseInstance:
def http_code_and_message(): def http_code_and_message():
return str(open_result.getcode()) + " " + httplib.responses[open_result.getcode()] + ": " + open_result.read() return str(open_result.getcode()) + " " + httplib.responses[open_result.getcode()] + ": " + open_result.read()
if expect_fail_and_get_error: if expect_fail_and_get_error:
if open_result.getcode() == 200: if open_result.getcode() == 200:
raise Exception("ClickHouse HTTP server is expected to fail, but succeeded: " + open_result.read()) raise Exception("ClickHouse HTTP server is expected to fail, but succeeded: " + open_result.read())
@ -699,6 +700,11 @@ class ClickHouseInstance:
raise Exception("ClickHouse HTTP server returned " + http_code_and_message()) raise Exception("ClickHouse HTTP server returned " + http_code_and_message())
return open_result.read() return open_result.read()
# Connects to the instance via HTTP interface, sends a query and returns the answer
def http_request(self, url, method='GET', params=None, data=None, headers=None):
url = "http://" + self.ip_address + ":8123/"+url
return requests.request(method=method, url=url, params=params, data=data, headers=headers)
# Connects to the instance via HTTP interface, sends a query, expects an error and return the error message # Connects to the instance via HTTP interface, sends a query, expects an error and return the error message
def http_query_and_get_error(self, sql, data=None, params=None, user=None, password=None): def http_query_and_get_error(self, sql, data=None, params=None, user=None, password=None):
return self.http_query(sql=sql, data=data, params=params, user=user, password=password, expect_fail_and_get_error=True) return self.http_query(sql=sql, data=data, params=params, user=user, password=password, expect_fail_and_get_error=True)

View File

@ -0,0 +1,115 @@
import os
import urllib
import contextlib
from helpers.cluster import ClickHouseCluster
class SimpleCluster:
def close(self):
self.cluster.shutdown()
def __init__(self, cluster, name, config_dir):
self.cluster = cluster
self.instance = self.add_instance(name, config_dir)
cluster.start()
def add_instance(self, name, config_dir):
script_path = os.path.dirname(os.path.realpath(__file__))
return self.cluster.add_instance(name, main_configs=[os.path.join(script_path, config_dir, 'config.xml')])
def test_dynamic_query_handler():
with contextlib.closing(SimpleCluster(ClickHouseCluster(__file__), "dynamic_handler", "test_dynamic_handler")) as cluster:
test_query = urllib.quote_plus('SELECT * FROM system.settings WHERE name = \'max_threads\'')
assert 404 == cluster.instance.http_request('?max_threads=1', method='GET', headers={'XXX': 'xxx'}).status_code
assert 404 == cluster.instance.http_request('test_dynamic_handler_get?max_threads=1', method='POST', headers={'XXX': 'xxx'}).status_code
assert 404 == cluster.instance.http_request('test_dynamic_handler_get?max_threads=1', method='GET', headers={'XXX': 'bad'}).status_code
assert 400 == cluster.instance.http_request('test_dynamic_handler_get?max_threads=1', method='GET', headers={'XXX': 'xxx'}).status_code
assert 200 == cluster.instance.http_request('test_dynamic_handler_get?max_threads=1&get_dynamic_handler_query=' + test_query,
method='GET', headers={'XXX': 'xxx'}).status_code
def test_predefined_query_handler():
with contextlib.closing(SimpleCluster(ClickHouseCluster(__file__), "predefined_handler", "test_predefined_handler")) as cluster:
assert 404 == cluster.instance.http_request('?max_threads=1', method='GET', headers={'XXX': 'xxx'}).status_code
assert 404 == cluster.instance.http_request('test_predefined_handler_get?max_threads=1', method='GET', headers={'XXX': 'bad'}).status_code
assert 404 == cluster.instance.http_request('test_predefined_handler_get?max_threads=1', method='POST', headers={'XXX': 'xxx'}).status_code
assert 500 == cluster.instance.http_request('test_predefined_handler_get?max_threads=1', method='GET', headers={'XXX': 'xxx'}).status_code
assert 'max_threads\t1\n' == cluster.instance.http_request('test_predefined_handler_get?max_threads=1&setting_name=max_threads', method='GET', headers={'XXX': 'xxx'}).content
assert 'max_threads\t1\nmax_alter_threads\t1\n' == cluster.instance.http_request(
'query_param_with_url/max_threads?max_threads=1&max_alter_threads=1', headers={'XXX': 'max_alter_threads'}).content
def test_fixed_static_handler():
with contextlib.closing(SimpleCluster(ClickHouseCluster(__file__), "static_handler", "test_static_handler")) as cluster:
assert 404 == cluster.instance.http_request('', method='GET', headers={'XXX': 'xxx'}).status_code
assert 404 == cluster.instance.http_request('test_get_fixed_static_handler', method='GET', headers={'XXX': 'bad'}).status_code
assert 404 == cluster.instance.http_request('test_get_fixed_static_handler', method='POST', headers={'XXX': 'xxx'}).status_code
assert 402 == cluster.instance.http_request('test_get_fixed_static_handler', method='GET', headers={'XXX': 'xxx'}).status_code
assert 'text/html; charset=UTF-8' == cluster.instance.http_request('test_get_fixed_static_handler', method='GET', headers={'XXX': 'xxx'}).headers['Content-Type']
assert 'Test get static handler and fix content' == cluster.instance.http_request('test_get_fixed_static_handler', method='GET', headers={'XXX': 'xxx'}).content
def test_config_static_handler():
with contextlib.closing(SimpleCluster(ClickHouseCluster(__file__), "static_handler", "test_static_handler")) as cluster:
assert 404 == cluster.instance.http_request('', method='GET', headers={'XXX': 'xxx'}).status_code
assert 404 == cluster.instance.http_request('test_get_config_static_handler', method='GET', headers={'XXX': 'bad'}).status_code
assert 404 == cluster.instance.http_request('test_get_config_static_handler', method='POST', headers={'XXX': 'xxx'}).status_code
# check default status code
assert 200 == cluster.instance.http_request('test_get_config_static_handler', method='GET', headers={'XXX': 'xxx'}).status_code
assert 'text/plain; charset=UTF-8' == cluster.instance.http_request('test_get_config_static_handler', method='GET', headers={'XXX': 'xxx'}).headers['Content-Type']
assert 'Test get static handler and config content' == cluster.instance.http_request('test_get_config_static_handler', method='GET', headers={'XXX': 'xxx'}).content
def test_absolute_path_static_handler():
with contextlib.closing(SimpleCluster(ClickHouseCluster(__file__), "static_handler", "test_static_handler")) as cluster:
cluster.instance.exec_in_container(
['bash', '-c', 'echo "<html><body>Absolute Path File</body></html>" > /var/lib/clickhouse/user_files/absolute_path_file.html'],
privileged=True, user='root')
assert 404 == cluster.instance.http_request('', method='GET', headers={'XXX': 'xxx'}).status_code
assert 404 == cluster.instance.http_request('test_get_absolute_path_static_handler', method='GET', headers={'XXX': 'bad'}).status_code
assert 404 == cluster.instance.http_request('test_get_absolute_path_static_handler', method='POST', headers={'XXX': 'xxx'}).status_code
# check default status code
assert 200 == cluster.instance.http_request('test_get_absolute_path_static_handler', method='GET', headers={'XXX': 'xxx'}).status_code
assert 'text/html; charset=UTF-8' == cluster.instance.http_request('test_get_absolute_path_static_handler', method='GET', headers={'XXX': 'xxx'}).headers['Content-Type']
assert '<html><body>Absolute Path File</body></html>\n' == cluster.instance.http_request('test_get_absolute_path_static_handler', method='GET', headers={'XXX': 'xxx'}).content
def test_relative_path_static_handler():
with contextlib.closing(SimpleCluster(ClickHouseCluster(__file__), "static_handler", "test_static_handler")) as cluster:
cluster.instance.exec_in_container(
['bash', '-c', 'echo "<html><body>Relative Path File</body></html>" > /var/lib/clickhouse/user_files/relative_path_file.html'],
privileged=True, user='root')
assert 404 == cluster.instance.http_request('', method='GET', headers={'XXX': 'xxx'}).status_code
assert 404 == cluster.instance.http_request('test_get_relative_path_static_handler', method='GET', headers={'XXX': 'bad'}).status_code
assert 404 == cluster.instance.http_request('test_get_relative_path_static_handler', method='POST', headers={'XXX': 'xxx'}).status_code
# check default status code
assert 200 == cluster.instance.http_request('test_get_relative_path_static_handler', method='GET', headers={'XXX': 'xxx'}).status_code
assert 'text/html; charset=UTF-8' == cluster.instance.http_request('test_get_relative_path_static_handler', method='GET', headers={'XXX': 'xxx'}).headers['Content-Type']
assert '<html><body>Relative Path File</body></html>\n' == cluster.instance.http_request('test_get_relative_path_static_handler', method='GET', headers={'XXX': 'xxx'}).content

View File

@ -0,0 +1,15 @@
<?xml version="1.0"?>
<yandex>
<http_handlers>
<rule>
<headers><XXX>xxx</XXX></headers>
<methods>GET</methods>
<url>/test_dynamic_handler_get</url>
<handler>
<type>dynamic_query_handler</type>
<query_param_name>get_dynamic_handler_query</query_param_name>
</handler>
</rule>
</http_handlers>
</yandex>

View File

@ -0,0 +1,25 @@
<?xml version="1.0"?>
<yandex>
<http_handlers>
<rule>
<methods>GET</methods>
<headers><XXX>xxx</XXX></headers>
<url>/test_predefined_handler_get</url>
<handler>
<type>predefined_query_handler</type>
<query>SELECT name, value FROM system.settings WHERE name = {setting_name:String}</query>
</handler>
</rule>
<rule>
<url><![CDATA[regex:/query_param_with_url/(?P<setting_name_1>[^/]+)]]></url>
<headers>
<XXX><![CDATA[regex:(?P<setting_name_2>.+)]]></XXX>
</headers>
<handler>
<type>predefined_query_handler</type>
<query>SELECT name, value FROM system.settings WHERE name = {setting_name_1:String} OR name = {setting_name_2:String}</query>
</handler>
</rule>
</http_handlers>
</yandex>

View File

@ -0,0 +1,52 @@
<?xml version="1.0"?>
<yandex>
<test_get_config_static_handler>Test get static handler and config content</test_get_config_static_handler>
<http_handlers>
<rule>
<methods>GET</methods>
<headers><XXX>xxx</XXX></headers>
<url>/test_get_fixed_static_handler</url>
<handler>
<type>static</type>
<status>402</status>
<content_type>text/html; charset=UTF-8</content_type>
<response_content>Test get static handler and fix content</response_content>
</handler>
</rule>
<rule>
<methods>GET</methods>
<headers><XXX>xxx</XXX></headers>
<url>/test_get_config_static_handler</url>
<handler>
<type>static</type>
<response_content>config://test_get_config_static_handler</response_content>
</handler>
</rule>
<rule>
<methods>GET</methods>
<headers><XXX>xxx</XXX></headers>
<url>/test_get_absolute_path_static_handler</url>
<handler>
<type>static</type>
<content_type>text/html; charset=UTF-8</content_type>
<response_content>file:///absolute_path_file.html</response_content>
</handler>
</rule>
<rule>
<methods>GET</methods>
<headers><XXX>xxx</XXX></headers>
<url>/test_get_relative_path_static_handler</url>
<handler>
<type>static</type>
<content_type>text/html; charset=UTF-8</content_type>
<response_content>file://./relative_path_file.html</response_content>
</handler>
</rule>
</http_handlers>
</yandex>