mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #47086 from pkit/pkit/insert_http_body
use http request body in predefined http handlers
This commit is contained in:
commit
f4d9b4a888
@ -718,6 +718,7 @@ class IColumn;
|
||||
M(Float, insert_keeper_fault_injection_probability, 0.0f, "Approximate probability of failure for a keeper request during insert. Valid value is in interval [0.0f, 1.0f]", 0) \
|
||||
M(UInt64, insert_keeper_fault_injection_seed, 0, "0 - random seed, otherwise the setting value", 0) \
|
||||
M(Bool, force_aggregation_in_order, false, "Force use of aggregation in order on remote nodes during distributed aggregation. PLEASE, NEVER CHANGE THIS SETTING VALUE MANUALLY!", IMPORTANT) \
|
||||
M(UInt64, http_max_request_param_data_size, 10_MiB, "Limit on size of request data used as a query parameter in predefined HTTP requests.", 0) \
|
||||
// End of COMMON_SETTINGS
|
||||
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS and move obsolete settings to OBSOLETE_SETTINGS.
|
||||
|
||||
|
@ -10,6 +10,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ATTEMPT_TO_READ_AFTER_EOF;
|
||||
extern const int CANNOT_READ_ALL_DATA;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -91,6 +92,13 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<v
|
||||
copyDataImpl(from, to, true, bytes, cancellation_hook, nullptr);
|
||||
}
|
||||
|
||||
void copyDataMaxBytes(ReadBuffer & from, WriteBuffer & to, size_t max_bytes)
|
||||
{
|
||||
copyDataImpl(from, to, false, max_bytes, nullptr, nullptr);
|
||||
if (!from.eof())
|
||||
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all data, max readable size reached.");
|
||||
}
|
||||
|
||||
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler)
|
||||
{
|
||||
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled, throttler);
|
||||
|
@ -27,6 +27,9 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atom
|
||||
void copyData(ReadBuffer & from, WriteBuffer & to, std::function<void()> cancellation_hook);
|
||||
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<void()> cancellation_hook);
|
||||
|
||||
/// Copies at most `max_bytes` bytes from ReadBuffer to WriteBuffer. If there are more bytes, then throws an exception.
|
||||
void copyDataMaxBytes(ReadBuffer & from, WriteBuffer & to, size_t max_bytes);
|
||||
|
||||
/// Same as above but also use throttler to limit maximum speed
|
||||
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler);
|
||||
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler);
|
||||
|
@ -83,7 +83,10 @@ void ReplaceQueryParameterVisitor::visitQueryParameter(ASTPtr & ast)
|
||||
IColumn & temp_column = *temp_column_ptr;
|
||||
ReadBufferFromString read_buffer{value};
|
||||
FormatSettings format_settings;
|
||||
data_type->getDefaultSerialization()->deserializeTextEscaped(temp_column, read_buffer, format_settings);
|
||||
if (ast_param.name == "_request_body")
|
||||
data_type->getDefaultSerialization()->deserializeWholeText(temp_column, read_buffer, format_settings);
|
||||
else
|
||||
data_type->getDefaultSerialization()->deserializeTextEscaped(temp_column, read_buffer, format_settings);
|
||||
|
||||
if (!read_buffer.eof())
|
||||
throw Exception(ErrorCodes::BAD_QUERY_PARAMETER,
|
||||
|
@ -783,7 +783,6 @@ void HTTPHandler::processQuery(
|
||||
/// they will be applied in ProcessList::insert() from executeQuery() itself.
|
||||
const auto & query = getQuery(request, params, context);
|
||||
std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query);
|
||||
in = has_external_data ? std::move(in_param) : std::make_unique<ConcatReadBuffer>(*in_param, *in_post_maybe_compressed);
|
||||
|
||||
/// HTTP response compression is turned on only if the client signalled that they support it
|
||||
/// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on.
|
||||
@ -833,7 +832,8 @@ void HTTPHandler::processQuery(
|
||||
});
|
||||
}
|
||||
|
||||
customizeContext(request, context);
|
||||
customizeContext(request, context, *in_post_maybe_compressed);
|
||||
in = has_external_data ? std::move(in_param) : std::make_unique<ConcatReadBuffer>(*in_param, *in_post_maybe_compressed);
|
||||
|
||||
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
|
||||
[&response, this] (const QueryResultDetails & details)
|
||||
@ -1153,7 +1153,7 @@ bool PredefinedQueryHandler::customizeQueryParam(ContextMutablePtr context, cons
|
||||
return false;
|
||||
}
|
||||
|
||||
void PredefinedQueryHandler::customizeContext(HTTPServerRequest & request, ContextMutablePtr context)
|
||||
void PredefinedQueryHandler::customizeContext(HTTPServerRequest & request, ContextMutablePtr context, ReadBuffer & body)
|
||||
{
|
||||
/// If in the configuration file, the handler's header is regex and contains named capture group
|
||||
/// We will extract regex named capture groups as query parameters
|
||||
@ -1187,6 +1187,15 @@ void PredefinedQueryHandler::customizeContext(HTTPServerRequest & request, Conte
|
||||
const auto & header_value = request.get(header_name);
|
||||
set_query_params(header_value.data(), header_value.data() + header_value.size(), regex);
|
||||
}
|
||||
|
||||
if (unlikely(receive_params.contains("_request_body") && !context->getQueryParameters().contains("_request_body")))
|
||||
{
|
||||
WriteBufferFromOwnString value;
|
||||
const auto & settings = context->getSettingsRef();
|
||||
|
||||
copyDataMaxBytes(body, value, settings.http_max_request_param_data_size);
|
||||
context->setQueryParameter("_request_body", value.str());
|
||||
}
|
||||
}
|
||||
|
||||
std::string PredefinedQueryHandler::getQuery(HTTPServerRequest & request, HTMLForm & params, ContextMutablePtr context)
|
||||
|
@ -36,7 +36,7 @@ public:
|
||||
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
|
||||
|
||||
/// This method is called right before the query execution.
|
||||
virtual void customizeContext(HTTPServerRequest & /* request */, ContextMutablePtr /* context */) {}
|
||||
virtual void customizeContext(HTTPServerRequest & /* request */, ContextMutablePtr /* context */, ReadBuffer & /* body */) {}
|
||||
|
||||
virtual bool customizeQueryParam(ContextMutablePtr context, const std::string & key, const std::string & value) = 0;
|
||||
|
||||
@ -163,7 +163,7 @@ public:
|
||||
, const CompiledRegexPtr & url_regex_, const std::unordered_map<String, CompiledRegexPtr> & header_name_with_regex_
|
||||
, const std::optional<std::string> & content_type_override_);
|
||||
|
||||
virtual void customizeContext(HTTPServerRequest & request, ContextMutablePtr context) override;
|
||||
void customizeContext(HTTPServerRequest & request, ContextMutablePtr context, ReadBuffer & body) override;
|
||||
|
||||
std::string getQuery(HTTPServerRequest & request, HTMLForm & params, ContextMutablePtr context) override;
|
||||
|
||||
|
@ -147,6 +147,18 @@ def test_predefined_query_handler():
|
||||
assert b"max_final_threads\t1\nmax_threads\t1\n" == res2.content
|
||||
assert "application/generic+one" == res2.headers["content-type"]
|
||||
|
||||
cluster.instance.query(
|
||||
"CREATE TABLE test_table (id UInt32, data String) Engine=TinyLog"
|
||||
)
|
||||
res3 = cluster.instance.http_request(
|
||||
"test_predefined_handler_post_body?id=100",
|
||||
method="POST",
|
||||
data="TEST".encode("utf8"),
|
||||
)
|
||||
assert res3.status_code == 200
|
||||
assert cluster.instance.query("SELECT * FROM test_table") == "100\tTEST\n"
|
||||
cluster.instance.query("DROP TABLE test_table")
|
||||
|
||||
|
||||
def test_fixed_static_handler():
|
||||
with contextlib.closing(
|
||||
|
@ -21,5 +21,13 @@
|
||||
<content_type>application/generic+one</content_type>
|
||||
</handler>
|
||||
</rule>
|
||||
<rule>
|
||||
<methods>POST</methods>
|
||||
<url>/test_predefined_handler_post_body</url>
|
||||
<handler>
|
||||
<type>predefined_query_handler</type>
|
||||
<query>INSERT INTO test_table(id, data) SELECT {id:UInt32}, {_request_body:String}</query>
|
||||
</handler>
|
||||
</rule>
|
||||
</http_handlers>
|
||||
</clickhouse>
|
||||
|
Loading…
Reference in New Issue
Block a user