ISSUES-5436 fix integration test failure & add test

This commit is contained in:
zhang2014 2020-04-21 19:30:45 +08:00
parent 318ab3b51e
commit 0070f75218
27 changed files with 326 additions and 292 deletions

View File

@ -1,97 +0,0 @@
import os
import urllib
import contextlib
from helpers.cluster import ClickHouseCluster
class SimpleCluster:
def close(self):
self.cluster.shutdown()
def __init__(self, cluster, name, config_dir):
self.cluster = cluster
self.instance = self.add_instance(name, config_dir)
cluster.start()
def add_instance(self, name, config_dir):
script_path = os.path.dirname(os.path.realpath(__file__))
return self.cluster.add_instance(name, config_dir=os.path.join(script_path, config_dir),
main_configs=[os.path.join(script_path, 'common_configs', 'common_config.xml')],
user_configs=[os.path.join(script_path, 'common_configs', 'common_users.xml')])
def test_dynamic_query_handler_with_insert_and_select():
with contextlib.closing(SimpleCluster(ClickHouseCluster(__file__), "dynamic_insert_and_select", "test_insert_and_select_dynamic")) as cluster:
insert_data_query = urllib.quote_plus('INSERT INTO test.test VALUES')
select_data_query = urllib.quote_plus('SELECT * FROM test.test ORDER BY id')
create_database_query = urllib.quote_plus('CREATE DATABASE test')
create_test_table_query = 'CREATE TABLE test.test (id UInt8) Engine = Memory'
assert cluster.instance.http_request('create_test_table?max_threads=1&test_create_query_param=' + create_database_query, method='PUT') == ''
assert cluster.instance.http_request('create_test_table?max_threads=1', method='PUT', data=create_test_table_query) == ''
assert cluster.instance.http_request('insert_data_to_test?max_threads=1&test_insert_query_param=' + insert_data_query + '(1)', method='POST') == ''
assert cluster.instance.http_request('insert_data_to_test?max_threads=1&test_insert_query_param=' + insert_data_query, method='POST', data='(2)') == ''
assert cluster.instance.http_request('insert_data_to_test?max_threads=1', method='POST', data='INSERT INTO test.test VALUES(3)(4)') == ''
assert cluster.instance.http_request('query_data_from_test?max_threads=1&test_select_query_param=' + select_data_query, method='GET') == '1\n2\n3\n4\n'
def test_predefined_query_handler_with_insert_and_select():
with contextlib.closing(SimpleCluster(ClickHouseCluster(__file__), "predefined_insert_and_select", "test_insert_and_select_predefined")) as cluster:
assert cluster.instance.http_request('create_test_table?max_threads=1', method='PUT') == ''
assert cluster.instance.http_request('insert_data_to_test?max_threads=1', method='POST', data='(1)(2)(3)(4)') == ''
assert cluster.instance.http_request('query_data_from_test?max_threads=1', method='GET') == '1\n2\n3\n4\n'
def test_dynamic_query_handler_with_params_and_settings():
with contextlib.closing(SimpleCluster(ClickHouseCluster(__file__), "dynamic_params_and_settings", "test_param_and_settings_dynamic")) as cluster:
settings = 'max_threads=1&max_alter_threads=2'
query_param = 'param_name_1=max_threads&param_name_2=max_alter_threads'
test_query = 'SELECT value FROM system.settings where name = {name_1:String} OR name = {name_2:String}'
quoted_test_query = urllib.quote_plus(test_query)
assert cluster.instance.http_request('post_query_params_and_settings?post_query_param=' + quoted_test_query + '&' + query_param + '&' + settings, method='POST') == '1\n2\n'
assert cluster.instance.http_request('post_query_params_and_settings?' + query_param + '&' + settings, method='POST', data=test_query) == '1\n2\n'
assert 'Syntax error' in cluster.instance.http_request('post_query_params_and_settings?post_query_param=' + test_query + '&' + settings, method='POST', data=query_param)
assert 'Syntax error' in cluster.instance.http_request('post_query_params_and_settings?post_query_param=' + test_query + '&' + query_param, method='POST', data=settings)
assert cluster.instance.http_request('get_query_params_and_settings?get_query_param=' + quoted_test_query + '&' + query_param + '&' + settings) == '1\n2\n'
assert cluster.instance.http_request('query_param_with_url/123/max_threads?query_param=' + quoted_test_query + '&' + settings + '&param_name_2=max_alter_threads') == '1\n2\n'
assert cluster.instance.http_request('query_param_with_url/123/max_threads/max_alter_threads?query_param=' + quoted_test_query + '&' + settings) == '1\n2\n'
assert '`name_2` is not set' in cluster.instance.http_request('query_param_with_url/123/max_threads?query_param=' + quoted_test_query + '&' + settings)
assert 'Duplicate name' in cluster.instance.http_request('query_param_with_url/123/max_threads_dump/max_alter_threads_dump?query_param=' + quoted_test_query + '&' + query_param + '&' + settings)
assert cluster.instance.http_request('test_match_headers?query_param=' + quoted_test_query + '&' + settings, headers={'XXX': 'TEST_HEADER_VALUE', 'PARAMS_XXX': 'max_threads/max_alter_threads'}) == '1\n2\n'
assert cluster.instance.http_request('test_match_headers?query_param=' + quoted_test_query + '&' + settings + '&param_name_2=max_alter_threads', headers={'XXX': 'TEST_HEADER_VALUE', 'PARAMS_XXX': 'max_threads'}) == '1\n2\n'
assert '`name_2` is not set' in cluster.instance.http_request('test_match_headers?query_param=' + quoted_test_query + '&' + settings, headers={'XXX': 'TEST_HEADER_VALUE', 'PARAMS_XXX': 'max_threads'})
assert 'There is no handle /test_match_headers' in cluster.instance.http_request('test_match_headers?query_param=' + quoted_test_query + '&' + settings)
def test_predefined_query_handler_with_params_and_settings():
with contextlib.closing(SimpleCluster(ClickHouseCluster(__file__), "predefined_params_and_settings", "test_param_and_settings_predefined")) as cluster:
settings = 'max_threads=1&max_alter_threads=2'
query_param = 'name_1=max_threads&name_2=max_alter_threads'
assert cluster.instance.http_request('get_query_params_and_settings?' + query_param + '&' + settings, method='GET') == '1\nmax_alter_threads\t2\n'
assert cluster.instance.http_request('query_param_with_url/123/max_threads/max_alter_threads?' + settings) == '1\nmax_alter_threads\t2\n'
assert cluster.instance.http_request('query_param_with_url/123/max_threads?' + settings + '&name_2=max_alter_threads') == '1\nmax_alter_threads\t2\n'
assert '`name_2` is not set' in cluster.instance.http_request('query_param_with_url/123/max_threads?' + settings)
assert 'Duplicate name' in cluster.instance.http_request('query_param_with_url/123/max_threads_dump/max_alter_threads_dump?' + query_param + '&' + settings)
assert cluster.instance.http_request('post_query_params_and_settings?' + query_param, method='POST', data=settings) == '1\nmax_alter_threads\t2\n'
assert cluster.instance.http_request('post_query_params_and_settings?' + settings, method='POST', data=query_param) == '1\nmax_alter_threads\t2\n'
assert cluster.instance.http_request('post_query_params_and_settings?' + query_param + '&' + settings, method='POST') == '1\nmax_alter_threads\t2\n'
assert cluster.instance.http_request('post_query_params_and_settings', method='POST', data=query_param + '&' + settings) == '1\nmax_alter_threads\t2\n'
assert cluster.instance.http_request('test_match_headers?' + settings, headers={'XXX': 'TEST_HEADER_VALUE', 'PARAMS_XXX': 'max_threads/max_alter_threads'}) == '1\nmax_alter_threads\t2\n'
assert cluster.instance.http_request('test_match_headers?' + settings + '&name_2=max_alter_threads', headers={'XXX': 'TEST_HEADER_VALUE', 'PARAMS_XXX': 'max_threads'}) == '1\nmax_alter_threads\t2\n'
assert '`name_2` is not set' in cluster.instance.http_request('test_match_headers?' + settings, headers={'XXX': 'TEST_HEADER_VALUE', 'PARAMS_XXX': 'max_threads'})
assert 'There is no handle /test_match_headers' in cluster.instance.http_request('test_match_headers?' + settings)
def test_other_configs():
with contextlib.closing(SimpleCluster(ClickHouseCluster(__file__), "test_other_configs", "other_tests_configs")) as cluster:
assert cluster.instance.http_request('', method='GET') == 'Ok.\n'
assert cluster.instance.http_request('ping_test', method='GET') == 'Ok.\n'
assert cluster.instance.http_request('404/NOT_FOUND', method='GET') == 'There is no handle /404/NOT_FOUND\n\ntest not found\n'
cluster.instance.query('CREATE DATABASE test')
cluster.instance.query('CREATE TABLE test.test (id UInt8) Engine = Memory')
assert cluster.instance.http_request('test_one_handler_with_insert_and_select?id=1', method='POST', data='(1)(2)') == '2\n'
assert 'Cannot parse input' in cluster.instance.http_request('test_one_handler_with_insert_and_select', method='POST', data='id=1')

View File

@ -1,23 +0,0 @@
<?xml version="1.0"?>
<yandex>
<http_handlers>
<dynamic_query_handler>
<method>PUT</method>
<url>/create_test_table</url>
<query_param_name>test_create_query_param</query_param_name>
</dynamic_query_handler>
<dynamic_query_handler>
<method>POST</method>
<url>/insert_data_to_test</url>
<query_param_name>test_insert_query_param</query_param_name>
</dynamic_query_handler>
<dynamic_query_handler>
<method>GET</method>
<url>/query_data_from_test</url>
<query_param_name>test_select_query_param</query_param_name>
</dynamic_query_handler>
</http_handlers>
</yandex>

View File

@ -1,26 +0,0 @@
<?xml version="1.0"?>
<yandex>
<http_handlers>
<predefined_query_handler>
<method>PUT</method>
<url>/create_test_table</url>
<queries>
<query>CREATE DATABASE test</query>
<query>CREATE TABLE test.test (id UInt8) Engine = Memory</query>
</queries>
</predefined_query_handler>
<predefined_query_handler>
<method>POST</method>
<url>/insert_data_to_test</url>
<queries><query>INSERT INTO test.test VALUES</query></queries>
</predefined_query_handler>
<predefined_query_handler>
<method>GET</method>
<url>/query_data_from_test</url>
<queries><query>SELECT * FROM test.test ORDER BY id</query></queries>
</predefined_query_handler>
</http_handlers>
</yandex>

View File

@ -569,7 +569,7 @@ void HTTPHandler::processQuery(
});
}
customizeContext(context);
customizeContext(request, context);
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & current_query_id, const String & content_type, const String & format, const String & timezone)
@ -754,8 +754,11 @@ std::string DynamicQueryHandler::getQuery(Poco::Net::HTTPServerRequest & request
return full_query;
}
PredefineQueryHandler::PredefineQueryHandler(IServer & server, const NameSet & receive_params_, const std::string & predefine_query_)
PredefineQueryHandler::PredefineQueryHandler(
IServer & server, const NameSet & receive_params_, const std::string & predefine_query_
, const std::unordered_map<String, String> & header_name_with_capture_regex_)
: HTTPHandler(server, "PredefineQueryHandler"), receive_params(receive_params_), predefine_query(predefine_query_)
, header_name_with_capture_regex(header_name_with_capture_regex_)
{
}
@ -770,6 +773,50 @@ bool PredefineQueryHandler::customizeQueryParam(Context & context, const std::st
return false;
}
void PredefineQueryHandler::customizeContext(Poco::Net::HTTPServerRequest & request, DB::Context & context)
{
/// If in the configuration file, the handler's header is regex and contains named capture group
/// We will extract regex named capture groups as query parameters
const auto & set_query_params = [&](const char * begin, const char * end, const std::string & regex)
{
auto compiled_regex = std::make_shared<re2_st::RE2>(regex);
if (!compiled_regex->ok())
throw Exception("cannot compile re2: " + regex + " for routing_rule, error: " + compiled_regex->error() +
". Look at https://github.com/google/re2/wiki/Syntax for reference.", ErrorCodes::CANNOT_COMPILE_REGEXP);
int num_captures = compiled_regex->NumberOfCapturingGroups() + 1;
re2_st::StringPiece matches[num_captures];
re2_st::StringPiece input(begin, end - begin);
if (compiled_regex->Match(input, 0, end - begin, re2_st::RE2::Anchor::ANCHOR_BOTH, matches, num_captures))
{
for (const auto & [capturing_name, capturing_index] : compiled_regex->NamedCapturingGroups())
{
const auto & capturing_value = matches[capturing_index];
if (capturing_value.data())
context.setQueryParameter(capturing_name, String(capturing_value.data(), capturing_value.size()));
}
}
};
for (const auto & [header_name, regex] : header_name_with_capture_regex)
{
if (header_name == "url")
{
const auto & uri = request.getURI();
set_query_params(uri.data(), find_first_symbols<'?'>(uri.data(), uri.data() + uri.size()), regex);
}
else
{
const auto & header_value = request.get(header_name);
set_query_params(header_value.data(), header_value.data() + header_value.size(), regex);
}
}
}
std::string PredefineQueryHandler::getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context)
{
if (unlikely(startsWith(request.getContentType(), "multipart/form-data")))
@ -784,8 +831,8 @@ std::string PredefineQueryHandler::getQuery(Poco::Net::HTTPServerRequest & reque
Poco::Net::HTTPRequestHandlerFactory * createDynamicHandlerFactory(IServer & server, const std::string & config_prefix)
{
const auto & query_param_name = server.config().getString(config_prefix + ".handler.query_param_name", "query");
return addFiltersFromConfig(new RoutingRuleHTTPHandlerFactory<DynamicQueryHandler>(server, query_param_name), server.config(), config_prefix);
std::string query_param_name = server.config().getString(config_prefix + ".handler.query_param_name", "query");
return addFiltersFromConfig(new RoutingRuleHTTPHandlerFactory<DynamicQueryHandler>(server, std::move(query_param_name)), server.config(), config_prefix);
}
@ -794,9 +841,41 @@ Poco::Net::HTTPRequestHandlerFactory * createPredefineHandlerFactory(IServer & s
if (!server.config().has(config_prefix + ".handler.query"))
throw Exception("There is no path '" + config_prefix + ".handler.query" + "' in configuration file.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
const auto & predefine_query = server.config().getString(config_prefix + ".handler.query");
std::string predefine_query = server.config().getString(config_prefix + ".handler.query");
NameSet analyze_receive_params = analyzeReceiveQueryParams(predefine_query);
std::unordered_map<String, String> type_and_regex;
Poco::Util::AbstractConfiguration::Keys filters_type;
server.config().keys(config_prefix, filters_type);
for (const auto & filter_type : filters_type)
{
auto expression = server.config().getString(config_prefix + "." + filter_type);
if (startsWith(expression, "regex:"))
{
expression = expression.substr(6);
auto compiled_regex = std::make_shared<re2_st::RE2>(expression);
if (!compiled_regex->ok())
throw Exception("cannot compile re2: " + expression + " for routing_rule, error: " + compiled_regex->error() +
". Look at https://github.com/google/re2/wiki/Syntax for reference.", ErrorCodes::CANNOT_COMPILE_REGEXP);
const auto & named_capturing_groups = compiled_regex->NamedCapturingGroups();
const auto & has_capturing_named_query_param = std::count_if(
named_capturing_groups.begin(), named_capturing_groups.end(), [&](const auto & iterator)
{
return std::count_if(analyze_receive_params.begin(), analyze_receive_params.end(), [&](const auto & param_name)
{
return param_name == iterator.first;
});
});
if (has_capturing_named_query_param)
type_and_regex.emplace(std::make_pair(filter_type, expression));
}
}
return addFiltersFromConfig(new RoutingRuleHTTPHandlerFactory<PredefineQueryHandler>(
server, analyzeReceiveQueryParams(predefine_query), predefine_query), server.config(), config_prefix);
server, std::move(analyze_receive_params), std::move(predefine_query), std::move(type_and_regex)), server.config(), config_prefix);
}
}

View File

@ -29,7 +29,7 @@ public:
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
/// This method is called right before the query execution.
virtual void customizeContext(Context & /* context */) {}
virtual void customizeContext(Poco::Net::HTTPServerRequest & request, Context & /* context */) {}
virtual bool customizeQueryParam(Context & context, const std::string & key, const std::string & value) = 0;
@ -101,12 +101,17 @@ class PredefineQueryHandler : public HTTPHandler
private:
NameSet receive_params;
std::string predefine_query;
std::unordered_map<String, String> header_name_with_capture_regex;
public:
explicit PredefineQueryHandler(IServer & server, const NameSet & receive_params, const std::string & predefine_query_);
explicit PredefineQueryHandler(
IServer & server, const NameSet & receive_params_, const std::string & predefine_query_
, const std::unordered_map<String, String> & header_name_with_capture_regex_);
virtual void customizeContext(Poco::Net::HTTPServerRequest & request, Context & context) override;
std::string getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context) override;
bool customizeQueryParam(Context &context, const std::string &key, const std::string &value) override;
bool customizeQueryParam(Context & context, const std::string & key, const std::string & value) override;
};
}

View File

@ -10,6 +10,7 @@
#include "StaticRequestHandler.h"
#include "ReplicasStatusHandler.h"
#include "InterserverIOHTTPHandler.h"
#include "PrometheusRequestHandler.h"
#if USE_RE2_ST
#include <re2_st/re2.h>
@ -35,17 +36,14 @@ HTTPRequestHandlerFactoryMain::HTTPRequestHandlerFactoryMain(const std::string &
Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHandler(const Poco::Net::HTTPServerRequest & request) // override
{
LOG_TRACE(log, "HTTP Request for " << name << ". "
<< "Method: "
<< request.getMethod()
<< ", Address: "
<< request.clientAddress().toString()
<< ", User-Agent: "
<< (request.has("User-Agent") ? request.get("User-Agent") : "none")
<< "Method: " << request.getMethod()
<< ", Address: " << request.clientAddress().toString()
<< ", User-Agent: " << (request.has("User-Agent") ? request.get("User-Agent") : "none")
<< (request.hasContentLength() ? (", Length: " + std::to_string(request.getContentLength())) : (""))
<< ", Content Type: " << request.getContentType()
<< ", Transfer Encoding: " << request.getTransferEncoding());
for (auto & handler_factory : child_handler_factories)
for (auto & handler_factory : child_factories)
{
auto handler = handler_factory->createRequestHandler(request);
if (handler != nullptr)
@ -64,13 +62,13 @@ Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHand
HTTPRequestHandlerFactoryMain::~HTTPRequestHandlerFactoryMain()
{
while (!child_handler_factories.empty())
delete child_handler_factories.back(), child_handler_factories.pop_back();
while (!child_factories.empty())
delete child_factories.back(), child_factories.pop_back();
}
HTTPRequestHandlerFactoryMain::TThis * HTTPRequestHandlerFactoryMain::addHandler(Poco::Net::HTTPRequestHandlerFactory * child_factory)
{
child_handler_factories.emplace_back(child_factory);
child_factories.emplace_back(child_factory);
return this;
}
@ -91,11 +89,11 @@ static inline auto createHandlersFactoryFromConfig(IServer & server, const std::
const auto & handler_type = server.config().getString(prefix + "." + key + ".handler.type", "");
if (handler_type == "static")
main_handler_factory->addHandler(createStaticHandlerFactory(server, prefix));
main_handler_factory->addHandler(createStaticHandlerFactory(server, prefix + "." + key));
else if (handler_type == "dynamic_query_handler")
main_handler_factory->addHandler(createDynamicHandlerFactory(server, prefix));
main_handler_factory->addHandler(createDynamicHandlerFactory(server, prefix + "." + key));
else if (handler_type == "predefine_query_handler")
main_handler_factory->addHandler(createPredefineHandlerFactory(server, prefix));
main_handler_factory->addHandler(createPredefineHandlerFactory(server, prefix + "." + key));
else
throw Exception("Unknown element in config: " + prefix + "." + key + ", must be 'routing_rule'", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
@ -112,13 +110,13 @@ static inline auto createHandlersFactoryFromConfig(IServer & server, const std::
static const auto ping_response_expression = "Ok.\n";
static const auto root_response_expression = "config://http_server_default_response";
static inline Poco::Net::HTTPRequestHandlerFactory * createHTTPHandlerFactory(IServer & server, const std::string & name)
static inline Poco::Net::HTTPRequestHandlerFactory * createHTTPHandlerFactory(IServer & server, const std::string & name, AsynchronousMetrics & async_metrics)
{
if (server.config().has("routing_rules"))
return createHandlersFactoryFromConfig(server, name, "routing_rules");
else
{
return (new HTTPRequestHandlerFactoryMain(name))
auto factory = (new HTTPRequestHandlerFactoryMain(name))
->addHandler((new RoutingRuleHTTPHandlerFactory<StaticRequestHandler>(server, root_response_expression))
->attachStrictPath("/")->allowGetAndHeadRequest())
->addHandler((new RoutingRuleHTTPHandlerFactory<StaticRequestHandler>(server, ping_response_expression))
@ -126,9 +124,13 @@ static inline Poco::Net::HTTPRequestHandlerFactory * createHTTPHandlerFactory(IS
->addHandler((new RoutingRuleHTTPHandlerFactory<ReplicasStatusHandler>(server))
->attachNonStrictPath("/replicas_status")->allowGetAndHeadRequest())
->addHandler((new RoutingRuleHTTPHandlerFactory<DynamicQueryHandler>(server, "query"))->allowPostAndGetParamsRequest());
/// TODO:
// if (configuration.has("prometheus") && configuration.getInt("prometheus.port", 0) == 0)
// handler_factory->addHandler<PrometheusHandlerFactory>(async_metrics);
if (server.config().has("prometheus") && server.config().getInt("prometheus.port", 0) == 0)
factory->addHandler((new RoutingRuleHTTPHandlerFactory<PrometheusRequestHandler>(
server, PrometheusMetricsWriter(server.config(), "prometheus", async_metrics)))
->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics"))->allowGetAndHeadRequest());
return factory;
}
}
@ -144,12 +146,16 @@ static inline Poco::Net::HTTPRequestHandlerFactory * createInterserverHTTPHandle
->addHandler((new RoutingRuleHTTPHandlerFactory<InterserverIOHTTPHandler>(server))->allowPostAndGetParamsRequest());
}
Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, const std::string & name)
Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & name)
{
if (name == "HTTPHandler-factory" || name == "HTTPSHandler-factory")
return createHTTPHandlerFactory(server, name);
return createHTTPHandlerFactory(server, name, async_metrics);
else if (name == "InterserverIOHTTPHandler-factory" || name == "InterserverIOHTTPSHandler-factory")
return createInterserverHTTPHandlerFactory(server, name);
else if (name == "PrometheusHandler-factory")
return (new HTTPRequestHandlerFactoryMain(name))->addHandler((new RoutingRuleHTTPHandlerFactory<PrometheusRequestHandler>(
server, PrometheusMetricsWriter(server.config(), "prometheus", async_metrics)))
->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics"))->allowGetAndHeadRequest());
throw Exception("LOGICAL ERROR: Unknown HTTP handler factory name.", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -2,10 +2,12 @@
#include "IServer.h"
#include <common/logger_useful.h>
#include <Common/HTMLForm.h>
#include <Common/StringUtils/StringUtils.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include <Interpreters/AsynchronousMetrics.h>
namespace DB
{
@ -19,7 +21,7 @@ private:
Logger * log;
std::string name;
std::vector<Poco::Net::HTTPRequestHandlerFactory *> child_handler_factories;
std::vector<Poco::Net::HTTPRequestHandlerFactory *> child_factories;
public:
~HTTPRequestHandlerFactoryMain();
@ -101,12 +103,12 @@ private:
std::function<Poco::Net::HTTPRequestHandler * ()> creator;
};
Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, const std::string & name);
Poco::Net::HTTPRequestHandlerFactory * createStaticHandlerFactory(IServer & server, const std::string & config_prefix);
Poco::Net::HTTPRequestHandlerFactory * createDynamicHandlerFactory(IServer & server, const std::string & config_prefix);
Poco::Net::HTTPRequestHandlerFactory * createPredefineHandlerFactory(IServer & server, const std::string & config_prefix);
Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & name);
}

View File

@ -20,58 +20,82 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_COMPILE_REGEXP;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
static inline std::string uriPathGetter(const Poco::Net::HTTPServerRequest & request)
static inline bool checkRegexExpression(const StringRef & match_str, const StringRef & expression)
{
const auto & uri = request.getURI();
const auto & end = find_first_symbols<'?'>(uri.data(), uri.data() + uri.size());
re2_st::StringPiece regex(expression.data, expression.size);
return std::string(uri.data(), end - uri.data());
}
static inline std::function<std::string(const Poco::Net::HTTPServerRequest &)> headerGetter(const std::string & header_name)
{
return [header_name](const Poco::Net::HTTPServerRequest & request) { return request.get(header_name, ""); };
}
static inline auto methodsExpressionFilter(const std::string &methods_expression)
{
Poco::StringTokenizer tokenizer(Poco::toUpper(Poco::trim(methods_expression)), ",");
return [methods = std::vector<String>(tokenizer.begin(), tokenizer.end())](const Poco::Net::HTTPServerRequest & request)
{
return std::count(methods.begin(), methods.end(), request.getMethod());
};
}
template <typename GetFunction>
static inline auto regularExpressionFilter(const std::string & regular_expression, const GetFunction & get)
{
auto compiled_regex = std::make_shared<re2_st::RE2>(regular_expression);
auto compiled_regex = std::make_shared<re2_st::RE2>(regex);
if (!compiled_regex->ok())
throw Exception("cannot compile re2: " + regular_expression + " for routing_rule, error: " + compiled_regex->error() +
throw Exception("cannot compile re2: " + expression.toString() + " for routing_rule, error: " + compiled_regex->error() +
". Look at https://github.com/google/re2/wiki/Syntax for reference.", ErrorCodes::CANNOT_COMPILE_REGEXP);
return std::make_pair(compiled_regex, [get = std::move(get), compiled_regex](const Poco::Net::HTTPServerRequest & request)
{
const auto & test_content = get(request);
int num_captures = compiled_regex->NumberOfCapturingGroups() + 1;
re2_st::StringPiece matches[num_captures];
re2_st::StringPiece input(test_content.data(), test_content.size());
return compiled_regex->Match(input, 0, test_content.size(), re2_st::RE2::Anchor::ANCHOR_BOTH, matches, num_captures);
});
re2_st::StringPiece match_input(match_str.data, match_str.size);
return compiled_regex->Match(match_input, 0, match_str.size, re2_st::RE2::Anchor::ANCHOR_BOTH, matches, num_captures);
}
template <typename GetFunction>
static inline std::function<bool(const Poco::Net::HTTPServerRequest &)> expressionFilter(const std::string & expression, const GetFunction & get)
static inline bool checkExpression(const StringRef & match_str, const std::string & expression)
{
if (startsWith(expression, "regex:"))
return regularExpressionFilter(expression, get).second;
return checkRegexExpression(match_str, expression.substr(6));
return [expression, get = std::move(get)](const Poco::Net::HTTPServerRequest & request) { return get(request) == expression; };
return match_str == expression;
}
static inline auto methodsFilter(Poco::Util::AbstractConfiguration & config, const std::string & config_path)
{
std::vector<String> methods;
Poco::StringTokenizer tokenizer(config.getString(config_path), ",");
for (auto iterator = tokenizer.begin(); iterator != tokenizer.end(); ++iterator)
methods.emplace_back(Poco::toUpper(Poco::trim(*iterator)));
return [methods](const Poco::Net::HTTPServerRequest & request) { return std::count(methods.begin(), methods.end(), request.getMethod()); };
}
static inline auto urlFilter(Poco::Util::AbstractConfiguration & config, const std::string & config_path)
{
return [expression = config.getString(config_path)](const Poco::Net::HTTPServerRequest & request)
{
const auto & uri = request.getURI();
const auto & end = find_first_symbols<'?'>(uri.data(), uri.data() + uri.size());
return checkExpression(StringRef(uri.data(), end - uri.data()), expression);
};
}
static inline auto headersFilter(Poco::Util::AbstractConfiguration & config, const std::string & prefix)
{
std::unordered_map<String, String> headers_expression;
Poco::Util::AbstractConfiguration::Keys headers_name;
config.keys(prefix, headers_name);
for (const auto & header_name : headers_name)
{
const auto & expression = config.getString(prefix + "." + header_name);
checkExpression("", expression); /// Check expression syntax is correct
headers_expression.emplace(std::make_pair(header_name, expression));
}
return [headers_expression](const Poco::Net::HTTPServerRequest & request)
{
for (const auto & [header_name, header_expression] : headers_expression)
{
const auto & header_value = request.get(header_name);
if (!checkExpression(StringRef(header_value.data(), header_value.size()), header_expression))
return false;
}
return true;
};
}
template <typename TEndpoint>
@ -84,12 +108,15 @@ static inline Poco::Net::HTTPRequestHandlerFactory * addFiltersFromConfig(
for (const auto & filter_type : filters_type)
{
if (filter_type == "handler")
continue; /// Skip handler config
else if (filter_type == "method")
factory->addFilter(methodsExpressionFilter(config.getString(prefix + "." + filter_type)));
continue;
else if (filter_type == "url")
factory->addFilter(urlFilter(config, prefix + ".url"));
else if (filter_type == "headers")
factory->addFilter(headersFilter(config, prefix + ".headers"));
else if (filter_type == "methods")
factory->addFilter(methodsFilter(config, prefix + ".methods"));
else
factory->addFilter(expressionFilter(config.getString(prefix + "." + filter_type), filter_type == "url"
? uriPathGetter : headerGetter(filter_type)));
throw Exception("Unknown element in config: " + prefix + "." + filter_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
return factory;

View File

@ -18,7 +18,7 @@ private:
const PrometheusMetricsWriter & metrics_writer;
public:
explicit PrometheusRequestHandler(IServer & server_, PrometheusMetricsWriter & metrics_writer_)
explicit PrometheusRequestHandler(IServer & server_, const PrometheusMetricsWriter & metrics_writer_)
: server(server_)
, metrics_writer(metrics_writer_)
{
@ -29,33 +29,4 @@ public:
Poco::Net::HTTPServerResponse & response) override;
};
template <typename HandlerType>
class PrometheusRequestHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
{
private:
IServer & server;
std::string endpoint_path;
PrometheusMetricsWriter metrics_writer;
public:
PrometheusRequestHandlerFactory(IServer & server_, const AsynchronousMetrics & async_metrics_)
: server(server_)
, endpoint_path(server_.config().getString("prometheus.endpoint", "/metrics"))
, metrics_writer(server_.config(), "prometheus", async_metrics_)
{
}
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override
{
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET
&& request.getURI() == endpoint_path)
return new HandlerType(server, metrics_writer);
return nullptr;
}
};
using PrometheusHandlerFactory = PrometheusRequestHandlerFactory<PrometheusRequestHandler>;
}

View File

@ -769,7 +769,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, "HTTPHandler-factory"), server_pool, socket, http_params));
createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for http://" + address.toString());
});
@ -783,7 +783,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, "HTTPSHandler-factory"), server_pool, socket, http_params));
createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for https://" + address.toString());
#else
@ -838,7 +838,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params));
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for replica communication (interserver): http://" + address.toString());
});
@ -851,7 +851,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params));
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for secure replica communication (interserver): https://" + address.toString());
#else
@ -877,22 +877,17 @@ int Server::main(const std::vector<std::string> & /*args*/)
});
/// Prometheus (if defined and not setup yet with http_port)
// create_server("prometheus.port", [&](UInt16 port)
// {
// Poco::Net::ServerSocket socket;
// auto address = socket_bind_listen(socket, listen_host, port);
// socket.setReceiveTimeout(settings.http_receive_timeout);
// socket.setSendTimeout(settings.http_send_timeout);
// auto handler_factory = new HTTPRequestHandlerFactoryMain(*this, "PrometheusHandler-factory");
// handler_factory->addHandler<PrometheusHandlerFactory>(async_metrics);
// servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
// handler_factory,
// server_pool,
// socket,
// http_params));
//
// LOG_INFO(log, "Listening for Prometheus: http://" + address.toString());
// });
create_server("prometheus.port", [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for Prometheus: http://" + address.toString());
});
}
if (servers.empty())

View File

@ -75,12 +75,12 @@ StaticRequestHandler::StaticRequestHandler(IServer & server_, const String & exp
Poco::Net::HTTPRequestHandlerFactory * createStaticHandlerFactory(IServer & server, const std::string & config_prefix)
{
const auto & status = server.config().getInt(config_prefix + ".handler.status", 200);
const auto & response_content = server.config().getRawString(config_prefix + ".handler.response_content", "Ok.\n");
const auto & response_content_type = server.config().getString(config_prefix + ".handler.content_type", "text/plain; charset=UTF-8");
int status = server.config().getInt(config_prefix + ".handler.status", 200);
std::string response_content = server.config().getRawString(config_prefix + ".handler.response_content", "Ok.\n");
std::string response_content_type = server.config().getString(config_prefix + ".handler.content_type", "text/plain; charset=UTF-8");
return addFiltersFromConfig(new RoutingRuleHTTPHandlerFactory<StaticRequestHandler>(
server, response_content, status, response_content_type), server.config(), config_prefix);
server, std::move(response_content), status, std::move(response_content_type)), server.config(), config_prefix);
}
}

View File

@ -525,30 +525,45 @@
</query_masking_rules>
-->
<http_handlers>
<root_handler/>
<ping_handler>/ping</ping_handler>
<replicas_status_handler>/replicas_status</replicas_status_handler>
<dynamic_query_handler><query_param_name>query</query_param_name></dynamic_query_handler>
<!-- Uncomment to use custom http routing.
url -
<no_handler_description>Use / or /ping for health checks.&#10;Or /replicas_status for more sophisticated health checks.&#10;Send queries from your program with POST method or GET /?query=...&#10;&#10;Use clickhouse-client:&#10;&#10;For interactive data analysis:&#10;clickhouse-client&#10;&#10;For batch query processing:&#10;clickhouse-client --query='SELECT 1' &gt; result&#10;clickhouse-client &lt; query &gt; result&#10;</no_handler_description>
<!-- Uncomment to use predefined queries.
url - RE2 compatible regular expression (optional)
method - HTTP method(optional)
headers - HTTP Header(optional)
queries - predefined queries (mandatory)
<predefined_query_handler>
<url>/test_simple_predefine</url>
<method>GET</method>
<headers> <X-ClickHouse-User>default</X-ClickHouse-User></headers>
<queries>
<query>SELECT 1, {query_prepared_param_1:String}</query>
<query>SELECT 1, {query_prepared_param_2:String}</query>
</queries>
</predefined_query_handler>
-->
</http_handlers>
<!--
<routing_rules>
&lt;!&ndash; rules are checked from top to bottom, first match runs the handler &ndash;&gt;
<routing_rule>
<url>/</url> &lt;!&ndash; match requests on that url &ndash;&gt;
<methods>POST,GET</methods> &lt;!&ndash; and with that method &ndash;&gt;
&lt;!&ndash; other possible request matching rules can be introduced, for headers, ips, etc &ndash;&gt;
<pragma>no-cache</pragma>
<handler>
<type>dynamic_query_handler</type>
<query_param_name>query</query_param_name>
</handler>
</routing_rule>
<routing_rule>
<url>/</url>
<handler>
<type>static</type>
<status>200</status>
<content-type>text/plain; charset=UTF-8</content-type>
<response_content>config://http_server_default_response</response_content>
</handler>
</routing_rule>
<rule> &lt;!&ndash; no conditions, i.e. match everything &ndash;&gt;
<handler>
<type>static</type>
<status>404</status>
<content-type>text/html</content-type>
<reponse_content><body><h1>Check your query</h1></body></reponse_content>
</handler>
</rule>
</routing_rules>
-->
<!-- Uncomment to disable ClickHouse internal DNS caching. -->
<!-- <disable_internal_dns_cache>1</disable_internal_dns_cache> -->

View File

@ -701,7 +701,7 @@ class ClickHouseInstance:
# Connects to the instance via HTTP interface, sends a query and returns the answer
def http_request(self, url, method='GET', params=None, data=None, headers=None):
url = "http://" + self.ip_address + ":8123/"+url
return requests.request(method=method, url=url, params=params, data=data, headers=headers).content
return requests.request(method=method, url=url, params=params, data=data, headers=headers)
# Connects to the instance via HTTP interface, sends a query, expects an error and return the error message
def http_query_and_get_error(self, sql, data=None, params=None, user=None, password=None):

View File

@ -0,0 +1,50 @@
import os
import urllib
import contextlib
from helpers.cluster import ClickHouseCluster
class SimpleCluster:
def close(self):
self.cluster.shutdown()
def __init__(self, cluster, name, config_dir):
self.cluster = cluster
self.instance = self.add_instance(name, config_dir)
cluster.start()
def add_instance(self, name, config_dir):
script_path = os.path.dirname(os.path.realpath(__file__))
return self.cluster.add_instance(name, config_dir=os.path.join(script_path, config_dir),
main_configs=[os.path.join(script_path, 'common_configs', 'common_config.xml')],
user_configs=[os.path.join(script_path, 'common_configs', 'common_users.xml')])
def test_dynamic_query_handler():
with contextlib.closing(SimpleCluster(ClickHouseCluster(__file__), "dynamic_handler", "test_dynamic_handler")) as cluster:
test_query = urllib.quote_plus('SELECT * FROM system.settings WHERE name = \'max_threads\'')
assert 404 == cluster.instance.http_request('?max_threads=1', method='GET', headers={'XXX': 'xxx'}).status_code
assert 404 == cluster.instance.http_request('test_dynamic_handler_get?max_threads=1', method='POST', headers={'XXX': 'xxx'}).status_code
assert 404 == cluster.instance.http_request('test_dynamic_handler_get?max_threads=1', method='GET', headers={'XXX': 'bad'}).status_code
assert 400 == cluster.instance.http_request('test_dynamic_handler_get?max_threads=1', method='GET', headers={'XXX': 'xxx'}).status_code
assert 200 == cluster.instance.http_request('test_dynamic_handler_get?max_threads=1&get_dynamic_handler_query=' + test_query,
method='GET', headers={'XXX': 'xxx'}).status_code
def test_predefine_query_handler():
with contextlib.closing(SimpleCluster(ClickHouseCluster(__file__), "predefined_handler", "test_predefined_handler")) as cluster:
assert 404 == cluster.instance.http_request('?max_threads=1', method='GET', headers={'XXX': 'xxx'}).status_code
assert 404 == cluster.instance.http_request('test_predefine_handler_get?max_threads=1', method='GET', headers={'XXX': 'bad'}).status_code
assert 404 == cluster.instance.http_request('test_predefine_handler_get?max_threads=1', method='POST', headers={'XXX': 'xxx'}).status_code
assert 400 == cluster.instance.http_request('test_predefine_handler_get?max_threads=1', method='GET', headers={'XXX': 'xxx'}).status_code
assert '1\n' == cluster.instance.http_request('test_predefine_handler_get?max_threads=1&setting_name=max_threads', method='GET', headers={'XXX': 'xxx'}).content

View File

@ -0,0 +1,15 @@
<?xml version="1.0"?>
<yandex>
<routing_rules>
<routing_rule>
<headers><XXX>xxx</XXX></headers>
<methods>GET</methods>
<url>/test_dynamic_handler_get</url>
<handler>
<type>dynamic_query_handler</type>
<query_param_name>get_dynamic_handler_query</query_param_name>
</handler>
</routing_rule>
</routing_rules>
</yandex>

View File

@ -0,0 +1,15 @@
<?xml version="1.0"?>
<yandex>
<routing_rules>
<routing_rule>
<methods>GET</methods>
<headers><XXX>xxx</XXX></headers>
<url>/test_predefine_handler_get</url>
<handler>
<type>predefine_query_handler</type>
<query>SELECT * FROM system.settings WHERE name = {setting_name:String}</query>
</handler>
</routing_rule>
</routing_rules>
</yandex>