This commit is contained in:
Konstantin Bogdanov 2024-09-16 21:27:53 +08:00 committed by GitHub
commit 7b98124e8a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 182 additions and 96 deletions

View File

@ -1,20 +1,27 @@
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h> #include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/HTTPCommon.h> #include <IO/HTTPCommon.h>
#include <IO/Progress.h> #include <IO/Progress.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <memory>
#include <sstream> #include <sstream>
#include <string> #include <string>
namespace DB namespace DB
{ {
void WriteBufferFromHTTPServerResponse::startSendHeadersPublic()
{
std::lock_guard lock(mutex);
startSendHeaders();
}
void WriteBufferFromHTTPServerResponse::startSendHeaders() void WriteBufferFromHTTPServerResponse::startSendHeaders()
{ {
if (!headers_started_sending) if (headers_started_sending)
{ return;
headers_started_sending = true; headers_started_sending = true;
if (response.getChunkedTransferEncoding()) if (response.getChunkedTransferEncoding())
@ -36,7 +43,6 @@ void WriteBufferFromHTTPServerResponse::startSendHeaders()
response.beginWrite(header); response.beginWrite(header);
auto header_str = header.str(); auto header_str = header.str();
socketSendBytes(header_str.data(), header_str.size()); socketSendBytes(header_str.data(), header_str.size());
}
} }
void WriteBufferFromHTTPServerResponse::writeHeaderProgressImpl(const char * header_name) void WriteBufferFromHTTPServerResponse::writeHeaderProgressImpl(const char * header_name)
@ -68,6 +74,7 @@ void WriteBufferFromHTTPServerResponse::writeExceptionCode()
{ {
if (headers_finished_sending || !exception_code) if (headers_finished_sending || !exception_code)
return; return;
if (headers_started_sending) if (headers_started_sending)
{ {
socketSendBytes("X-ClickHouse-Exception-Code: ", sizeof("X-ClickHouse-Exception-Code: ") - 1); socketSendBytes("X-ClickHouse-Exception-Code: ", sizeof("X-ClickHouse-Exception-Code: ") - 1);
@ -129,10 +136,18 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse( WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse(
HTTPServerResponse & response_, HTTPServerResponse & response_,
bool is_http_method_head_, bool is_http_method_head_,
const ProfileEvents::Event & write_event_) const ProfileEvents::Event & write_event_,
const CompressionMethod & compression_method_,
bool add_cors_header_,
bool send_progress_,
size_t send_progress_interval_ms_)
: HTTPWriteBuffer(response_.getSocket(), write_event_) : HTTPWriteBuffer(response_.getSocket(), write_event_)
, response(response_) , response(response_)
, is_http_method_head(is_http_method_head_) , is_http_method_head(is_http_method_head_)
, add_cors_header(add_cors_header_)
, send_progress(send_progress_)
, send_progress_interval_ms(send_progress_interval_ms_)
, compression_method(compression_method_)
{ {
} }

View File

@ -11,7 +11,6 @@
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <mutex> #include <mutex>
#include <optional>
namespace DB namespace DB
@ -29,7 +28,11 @@ public:
WriteBufferFromHTTPServerResponse( WriteBufferFromHTTPServerResponse(
HTTPServerResponse & response_, HTTPServerResponse & response_,
bool is_http_method_head_, bool is_http_method_head_,
const ProfileEvents::Event & write_event_ = ProfileEvents::end()); const ProfileEvents::Event & write_event_ = ProfileEvents::end(),
const CompressionMethod & compression_method_ = CompressionMethod::None,
bool add_cors_header_ = false,
bool send_progress_ = false,
size_t send_progress_interval_ms_ = 100);
~WriteBufferFromHTTPServerResponse() override; ~WriteBufferFromHTTPServerResponse() override;
@ -60,6 +63,9 @@ public:
void setExceptionCode(int exception_code_); void setExceptionCode(int exception_code_);
/// FIXME
void startSendHeadersPublic();
private: private:
/// Send at least HTTP headers if no data has been sent yet. /// Send at least HTTP headers if no data has been sent yet.
/// Use after the data has possibly been sent and no error happened (and thus you do not plan /// Use after the data has possibly been sent and no error happened (and thus you do not plan
@ -89,19 +95,19 @@ private:
HTTPServerResponse & response; HTTPServerResponse & response;
bool is_http_method_head; bool is_http_method_head;
bool add_cors_header = false; bool add_cors_header;
bool initialized = false; bool initialized = false;
bool headers_started_sending = false; std::atomic_bool headers_started_sending = false;
bool headers_finished_sending = false; /// If true, you could not add any headers. std::atomic_bool headers_finished_sending = false; /// If true, you could not add any headers.
Progress accumulated_progress; Progress accumulated_progress;
bool send_progress = false; bool send_progress;
size_t send_progress_interval_ms = 100; size_t send_progress_interval_ms;
Stopwatch progress_watch; Stopwatch progress_watch;
CompressionMethod compression_method = CompressionMethod::None; CompressionMethod compression_method;
int exception_code = 0; int exception_code = 0;

View File

@ -261,18 +261,79 @@ void HTTPHandler::processQuery(
bool enable_http_compression = params.getParsed<bool>("enable_http_compression", context->getSettingsRef().enable_http_compression); bool enable_http_compression = params.getParsed<bool>("enable_http_compression", context->getSettingsRef().enable_http_compression);
Int64 http_zlib_compression_level = params.getParsed<Int64>("http_zlib_compression_level", context->getSettingsRef().http_zlib_compression_level); Int64 http_zlib_compression_level = params.getParsed<Int64>("http_zlib_compression_level", context->getSettingsRef().http_zlib_compression_level);
auto chosen_compression_method = enable_http_compression ? http_response_compression_method : CompressionMethod::None;
bool has_external_data = startsWith(request.getContentType(), "multipart/form-data");
auto param_could_be_skipped = [&] (const String & name)
{
/// Empty parameter appears when URL like ?&a=b or a=b&&c=d. Just skip them for user's convenience.
if (name.empty())
return true;
/// Some parameters (database, default_format, everything used in the code above) do not
/// belong to the Settings class.
static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "role",
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session",
"database", "default_format"};
if (reserved_param_names.contains(name))
return true;
/// For external data we also want settings.
if (has_external_data)
{
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
/// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
static const Names reserved_param_suffixes = {"_format", "_types", "_structure"};
for (const String & suffix : reserved_param_suffixes)
{
if (endsWith(name, suffix))
return true;
}
}
return false;
};
/// Settings can be overridden in the query.
SettingsChanges settings_changes;
for (const auto & [key, value] : params)
{
if (!param_could_be_skipped(key))
{
/// Other than query parameters are treated as settings.
if (!customizeQueryParam(context, key, value))
settings_changes.push_back({key, value});
}
}
context->checkSettingsConstraints(settings_changes, SettingSource::QUERY);
context->applySettingsChanges(settings_changes);
const auto & settings = context->getSettingsRef();
/// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields.
context->setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", "")));
/// Initialize query scope, once query_id is initialized.
/// (To track as much allocations as possible)
query_scope.emplace(context);
bool enable_cors = settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response");
used_output.out_holder = used_output.out_holder =
std::make_shared<WriteBufferFromHTTPServerResponse>( std::make_shared<WriteBufferFromHTTPServerResponse>(
response, response,
request.getMethod() == HTTPRequest::HTTP_HEAD, request.getMethod() == HTTPRequest::HTTP_HEAD,
write_event); write_event,
chosen_compression_method,
enable_cors,
settings.send_progress_in_http_headers,
settings.http_headers_progress_interval_ms);
used_output.out = used_output.out_holder; used_output.out = used_output.out_holder;
used_output.out_maybe_compressed = used_output.out_holder; used_output.out_maybe_compressed = used_output.out_holder;
if (client_supports_http_compression && enable_http_compression) if (client_supports_http_compression && enable_http_compression)
{ {
used_output.out_holder->setCompressionMethodHeader(http_response_compression_method);
used_output.wrap_compressed_holder = used_output.wrap_compressed_holder =
wrapWriteBufferWithCompressionMethod( wrapWriteBufferWithCompressionMethod(
used_output.out_holder.get(), used_output.out_holder.get(),
@ -369,61 +430,61 @@ void HTTPHandler::processQuery(
/// Anything else beside HTTP POST should be readonly queries. /// Anything else beside HTTP POST should be readonly queries.
setReadOnlyIfHTTPMethodIdempotent(context, request.getMethod()); setReadOnlyIfHTTPMethodIdempotent(context, request.getMethod());
bool has_external_data = startsWith(request.getContentType(), "multipart/form-data"); // bool has_external_data = startsWith(request.getContentType(), "multipart/form-data");
//
auto param_could_be_skipped = [&] (const String & name) // auto param_could_be_skipped = [&] (const String & name)
{ // {
/// Empty parameter appears when URL like ?&a=b or a=b&&c=d. Just skip them for user's convenience. // /// Empty parameter appears when URL like ?&a=b or a=b&&c=d. Just skip them for user's convenience.
if (name.empty()) // if (name.empty())
return true; // return true;
//
/// Some parameters (database, default_format, everything used in the code above) do not // /// Some parameters (database, default_format, everything used in the code above) do not
/// belong to the Settings class. // /// belong to the Settings class.
static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "role", // static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "role",
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session", // "buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session",
"database", "default_format"}; // "database", "default_format"};
//
if (reserved_param_names.contains(name)) // if (reserved_param_names.contains(name))
return true; // return true;
//
/// For external data we also want settings. // /// For external data we also want settings.
if (has_external_data) // if (has_external_data)
{ // {
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters. // /// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
/// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings. // /// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
static const Names reserved_param_suffixes = {"_format", "_types", "_structure"}; // static const Names reserved_param_suffixes = {"_format", "_types", "_structure"};
for (const String & suffix : reserved_param_suffixes) // for (const String & suffix : reserved_param_suffixes)
{ // {
if (endsWith(name, suffix)) // if (endsWith(name, suffix))
return true; // return true;
} // }
} // }
//
return false; // return false;
}; // };
//
/// Settings can be overridden in the query. // /// Settings can be overridden in the query.
SettingsChanges settings_changes; // SettingsChanges settings_changes;
for (const auto & [key, value] : params) // for (const auto & [key, value] : params)
{ // {
if (!param_could_be_skipped(key)) // if (!param_could_be_skipped(key))
{ // {
/// Other than query parameters are treated as settings. // /// Other than query parameters are treated as settings.
if (!customizeQueryParam(context, key, value)) // if (!customizeQueryParam(context, key, value))
settings_changes.push_back({key, value}); // settings_changes.push_back({key, value});
} // }
} // }
//
context->checkSettingsConstraints(settings_changes, SettingSource::QUERY); // context->checkSettingsConstraints(settings_changes, SettingSource::QUERY);
context->applySettingsChanges(settings_changes); // context->applySettingsChanges(settings_changes);
const auto & settings = context->getSettingsRef(); // const auto & settings = context->getSettingsRef();
//
/// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields. // /// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields.
context->setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", ""))); // context->setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", "")));
//
/// Initialize query scope, once query_id is initialized. // /// Initialize query scope, once query_id is initialized.
/// (To track as much allocations as possible) // /// (To track as much allocations as possible)
query_scope.emplace(context); // query_scope.emplace(context);
/// NOTE: this may create pretty huge allocations that will not be accounted in trace_log, /// NOTE: this may create pretty huge allocations that will not be accounted in trace_log,
/// because memory_profiler_sample_probability/memory_profiler_step are not applied yet, /// because memory_profiler_sample_probability/memory_profiler_step are not applied yet,
@ -431,8 +492,8 @@ void HTTPHandler::processQuery(
const auto & query = getQuery(request, params, context); const auto & query = getQuery(request, params, context);
std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query); std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query);
used_output.out_holder->setSendProgress(settings.send_progress_in_http_headers); // used_output.out_holder->setSendProgress(settings.send_progress_in_http_headers);
used_output.out_holder->setSendProgressInterval(settings.http_headers_progress_interval_ms); // used_output.out_holder->setSendProgressInterval(settings.http_headers_progress_interval_ms);
/// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on, /// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on,
/// checksums of client data compressed with internal algorithm are not checked. /// checksums of client data compressed with internal algorithm are not checked.
@ -442,8 +503,8 @@ void HTTPHandler::processQuery(
/// Add CORS header if 'add_http_cors_header' setting is turned on send * in Access-Control-Allow-Origin /// Add CORS header if 'add_http_cors_header' setting is turned on send * in Access-Control-Allow-Origin
/// Note that whether the header is added is determined by the settings, and we can only get the user settings after authentication. /// Note that whether the header is added is determined by the settings, and we can only get the user settings after authentication.
/// Once the authentication fails, the header can't be added. /// Once the authentication fails, the header can't be added.
if (settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response")) // if (settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response"))
used_output.out_holder->addHeaderCORS(true); // used_output.out_holder->addHeaderCORS(true);
auto append_callback = [my_context = context] (ProgressCallback callback) auto append_callback = [my_context = context] (ProgressCallback callback)
{ {
@ -481,7 +542,7 @@ void HTTPHandler::processQuery(
applyHTTPResponseHeaders(response, http_response_headers_override); applyHTTPResponseHeaders(response, http_response_headers_override);
auto set_query_result = [&response, this] (const QueryResultDetails & details) auto set_query_result = [&response, &used_output, this] (const QueryResultDetails & details)
{ {
response.add("X-ClickHouse-Query-Id", details.query_id); response.add("X-ClickHouse-Query-Id", details.query_id);
@ -494,6 +555,10 @@ void HTTPHandler::processQuery(
if (details.timezone) if (details.timezone)
response.add("X-ClickHouse-Timezone", *details.timezone); response.add("X-ClickHouse-Timezone", *details.timezone);
/// Start sending headers early, as it requires access to the internal
/// WriteBuffer, which may also be used concurrently by compressing wrapper.
used_output.out_holder->startSendHeadersPublic();
}; };
auto handle_exception_in_output_format = [&](IOutputFormat & current_output_format, const String & format_name, const ContextPtr & context_, const std::optional<FormatSettings> & format_settings) auto handle_exception_in_output_format = [&](IOutputFormat & current_output_format, const String & format_name, const ContextPtr & context_, const std::optional<FormatSettings> & format_settings)