From e3dd7b6668caa16d98be0ac1b570dd556511f6d3 Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Fri, 13 Sep 2024 17:58:29 +0200 Subject: [PATCH] temp-commit --- .../WriteBufferFromHTTPServerResponse.cpp | 63 +++--- .../HTTP/WriteBufferFromHTTPServerResponse.h | 26 ++- src/Server/HTTPHandler.cpp | 187 ++++++++++++------ 3 files changed, 181 insertions(+), 95 deletions(-) diff --git a/src/Server/HTTP/WriteBufferFromHTTPServerResponse.cpp b/src/Server/HTTP/WriteBufferFromHTTPServerResponse.cpp index 946eaf8aea4..44316ebef5b 100644 --- a/src/Server/HTTP/WriteBufferFromHTTPServerResponse.cpp +++ b/src/Server/HTTP/WriteBufferFromHTTPServerResponse.cpp @@ -1,42 +1,48 @@ #include + #include #include #include #include -#include #include #include namespace DB { +void WriteBufferFromHTTPServerResponse::startSendHeadersPublic() +{ + std::lock_guard lock(mutex); + + startSendHeaders(); +} void WriteBufferFromHTTPServerResponse::startSendHeaders() { - if (!headers_started_sending) + if (headers_started_sending) + return; + + headers_started_sending = true; + + if (response.getChunkedTransferEncoding()) + setChunked(); + else if (response.getContentLength() == Poco::Net::HTTPMessage::UNKNOWN_CONTENT_LENGTH) { - headers_started_sending = true; - - if (response.getChunkedTransferEncoding()) - setChunked(); - else if (response.getContentLength() == Poco::Net::HTTPMessage::UNKNOWN_CONTENT_LENGTH) - { - /// In case there is no Content-Length we cannot use keep-alive, - /// since there is no way to know when the server send all the - /// data, so "Connection: close" should be sent. - response.setKeepAlive(false); - } - - if (add_cors_header) - response.set("Access-Control-Allow-Origin", "*"); - - setResponseDefaultHeaders(response); - - std::stringstream header; //STYLE_CHECK_ALLOW_STD_STRING_STREAM - response.beginWrite(header); - auto header_str = header.str(); - socketSendBytes(header_str.data(), header_str.size()); + /// In case there is no Content-Length we cannot use keep-alive, + /// since there is no way to know when the server send all the + /// data, so "Connection: close" should be sent. + response.setKeepAlive(false); } + + if (add_cors_header) + response.set("Access-Control-Allow-Origin", "*"); + + setResponseDefaultHeaders(response); + + std::stringstream header; //STYLE_CHECK_ALLOW_STD_STRING_STREAM + response.beginWrite(header); + auto header_str = header.str(); + socketSendBytes(header_str.data(), header_str.size()); } void WriteBufferFromHTTPServerResponse::writeHeaderProgressImpl(const char * header_name) @@ -68,6 +74,7 @@ void WriteBufferFromHTTPServerResponse::writeExceptionCode() { if (headers_finished_sending || !exception_code) return; + if (headers_started_sending) { socketSendBytes("X-ClickHouse-Exception-Code: ", sizeof("X-ClickHouse-Exception-Code: ") - 1); @@ -129,10 +136,18 @@ void WriteBufferFromHTTPServerResponse::nextImpl() WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse( HTTPServerResponse & response_, bool is_http_method_head_, - const ProfileEvents::Event & write_event_) + const ProfileEvents::Event & write_event_, + const CompressionMethod & compression_method_, + bool add_cors_header_, + bool send_progress_, + size_t send_progress_interval_ms_) : HTTPWriteBuffer(response_.getSocket(), write_event_) , response(response_) , is_http_method_head(is_http_method_head_) + , add_cors_header(add_cors_header_) + , send_progress(send_progress_) + , send_progress_interval_ms(send_progress_interval_ms_) + , compression_method(compression_method_) { } diff --git a/src/Server/HTTP/WriteBufferFromHTTPServerResponse.h b/src/Server/HTTP/WriteBufferFromHTTPServerResponse.h index f0c80f24582..021ecaf3c0c 100644 --- a/src/Server/HTTP/WriteBufferFromHTTPServerResponse.h +++ b/src/Server/HTTP/WriteBufferFromHTTPServerResponse.h @@ -11,7 +11,6 @@ #include #include -#include namespace DB @@ -29,7 +28,11 @@ public: WriteBufferFromHTTPServerResponse( HTTPServerResponse & response_, bool is_http_method_head_, - const ProfileEvents::Event & write_event_ = ProfileEvents::end()); + const ProfileEvents::Event & write_event_ = ProfileEvents::end(), + const CompressionMethod & compression_method_ = CompressionMethod::None, + bool add_cors_header_ = false, + bool send_progress_ = false, + size_t send_progress_interval_ms_ = 100); ~WriteBufferFromHTTPServerResponse() override; @@ -60,6 +63,9 @@ public: void setExceptionCode(int exception_code_); + /// FIXME + void startSendHeadersPublic(); + private: /// Send at least HTTP headers if no data has been sent yet. /// Use after the data has possibly been sent and no error happened (and thus you do not plan @@ -69,7 +75,7 @@ private: /// Must be called under locked mutex. /// This method send headers, if this was not done already, - /// but not finish them with \r\n, allowing to send more headers subsequently. + /// but not finish them with \r\n, allowing to send more headers subsequently. void startSendHeaders(); /// Used to write the header X-ClickHouse-Progress / X-ClickHouse-Summary @@ -89,23 +95,23 @@ private: HTTPServerResponse & response; bool is_http_method_head; - bool add_cors_header = false; + bool add_cors_header; bool initialized = false; - bool headers_started_sending = false; - bool headers_finished_sending = false; /// If true, you could not add any headers. + std::atomic_bool headers_started_sending = false; + std::atomic_bool headers_finished_sending = false; /// If true, you could not add any headers. Progress accumulated_progress; - bool send_progress = false; - size_t send_progress_interval_ms = 100; + bool send_progress; + size_t send_progress_interval_ms; Stopwatch progress_watch; - CompressionMethod compression_method = CompressionMethod::None; + CompressionMethod compression_method; int exception_code = 0; - std::mutex mutex; /// progress callback could be called from different threads. + std::mutex mutex; /// progress callback could be called from different threads. }; } diff --git a/src/Server/HTTPHandler.cpp b/src/Server/HTTPHandler.cpp index d2bc22e98cc..eead407f5d9 100644 --- a/src/Server/HTTPHandler.cpp +++ b/src/Server/HTTPHandler.cpp @@ -261,18 +261,79 @@ void HTTPHandler::processQuery( bool enable_http_compression = params.getParsed("enable_http_compression", context->getSettingsRef().enable_http_compression); Int64 http_zlib_compression_level = params.getParsed("http_zlib_compression_level", context->getSettingsRef().http_zlib_compression_level); + auto chosen_compression_method = enable_http_compression ? http_response_compression_method : CompressionMethod::None; + + bool has_external_data = startsWith(request.getContentType(), "multipart/form-data"); + auto param_could_be_skipped = [&] (const String & name) + { + /// Empty parameter appears when URL like ?&a=b or a=b&&c=d. Just skip them for user's convenience. + if (name.empty()) + return true; + + /// Some parameters (database, default_format, everything used in the code above) do not + /// belong to the Settings class. + static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "role", + "buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session", + "database", "default_format"}; + + if (reserved_param_names.contains(name)) + return true; + + /// For external data we also want settings. + if (has_external_data) + { + /// Skip unneeded parameters to avoid confusing them later with context settings or query parameters. + /// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings. + static const Names reserved_param_suffixes = {"_format", "_types", "_structure"}; + for (const String & suffix : reserved_param_suffixes) + { + if (endsWith(name, suffix)) + return true; + } + } + + return false; + }; + /// Settings can be overridden in the query. + SettingsChanges settings_changes; + for (const auto & [key, value] : params) + { + if (!param_could_be_skipped(key)) + { + /// Other than query parameters are treated as settings. + if (!customizeQueryParam(context, key, value)) + settings_changes.push_back({key, value}); + } + } + + context->checkSettingsConstraints(settings_changes, SettingSource::QUERY); + context->applySettingsChanges(settings_changes); + const auto & settings = context->getSettingsRef(); + + /// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields. + context->setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", ""))); + + /// Initialize query scope, once query_id is initialized. + /// (To track as much allocations as possible) + query_scope.emplace(context); + + bool enable_cors = settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response"); used_output.out_holder = std::make_shared( response, request.getMethod() == HTTPRequest::HTTP_HEAD, - write_event); + write_event, + chosen_compression_method, + enable_cors, + settings.send_progress_in_http_headers, + settings.http_headers_progress_interval_ms); + used_output.out = used_output.out_holder; used_output.out_maybe_compressed = used_output.out_holder; if (client_supports_http_compression && enable_http_compression) { - used_output.out_holder->setCompressionMethodHeader(http_response_compression_method); used_output.wrap_compressed_holder = wrapWriteBufferWithCompressionMethod( used_output.out_holder.get(), @@ -369,61 +430,61 @@ void HTTPHandler::processQuery( /// Anything else beside HTTP POST should be readonly queries. setReadOnlyIfHTTPMethodIdempotent(context, request.getMethod()); - bool has_external_data = startsWith(request.getContentType(), "multipart/form-data"); - - auto param_could_be_skipped = [&] (const String & name) - { - /// Empty parameter appears when URL like ?&a=b or a=b&&c=d. Just skip them for user's convenience. - if (name.empty()) - return true; - - /// Some parameters (database, default_format, everything used in the code above) do not - /// belong to the Settings class. - static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "role", - "buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session", - "database", "default_format"}; - - if (reserved_param_names.contains(name)) - return true; - - /// For external data we also want settings. - if (has_external_data) - { - /// Skip unneeded parameters to avoid confusing them later with context settings or query parameters. - /// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings. - static const Names reserved_param_suffixes = {"_format", "_types", "_structure"}; - for (const String & suffix : reserved_param_suffixes) - { - if (endsWith(name, suffix)) - return true; - } - } - - return false; - }; - - /// Settings can be overridden in the query. - SettingsChanges settings_changes; - for (const auto & [key, value] : params) - { - if (!param_could_be_skipped(key)) - { - /// Other than query parameters are treated as settings. - if (!customizeQueryParam(context, key, value)) - settings_changes.push_back({key, value}); - } - } - - context->checkSettingsConstraints(settings_changes, SettingSource::QUERY); - context->applySettingsChanges(settings_changes); - const auto & settings = context->getSettingsRef(); - - /// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields. - context->setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", ""))); - - /// Initialize query scope, once query_id is initialized. - /// (To track as much allocations as possible) - query_scope.emplace(context); + // bool has_external_data = startsWith(request.getContentType(), "multipart/form-data"); + // + // auto param_could_be_skipped = [&] (const String & name) + // { + // /// Empty parameter appears when URL like ?&a=b or a=b&&c=d. Just skip them for user's convenience. + // if (name.empty()) + // return true; + // + // /// Some parameters (database, default_format, everything used in the code above) do not + // /// belong to the Settings class. + // static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "role", + // "buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session", + // "database", "default_format"}; + // + // if (reserved_param_names.contains(name)) + // return true; + // + // /// For external data we also want settings. + // if (has_external_data) + // { + // /// Skip unneeded parameters to avoid confusing them later with context settings or query parameters. + // /// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings. + // static const Names reserved_param_suffixes = {"_format", "_types", "_structure"}; + // for (const String & suffix : reserved_param_suffixes) + // { + // if (endsWith(name, suffix)) + // return true; + // } + // } + // + // return false; + // }; + // + // /// Settings can be overridden in the query. + // SettingsChanges settings_changes; + // for (const auto & [key, value] : params) + // { + // if (!param_could_be_skipped(key)) + // { + // /// Other than query parameters are treated as settings. + // if (!customizeQueryParam(context, key, value)) + // settings_changes.push_back({key, value}); + // } + // } + // + // context->checkSettingsConstraints(settings_changes, SettingSource::QUERY); + // context->applySettingsChanges(settings_changes); + // const auto & settings = context->getSettingsRef(); + // + // /// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields. + // context->setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", ""))); + // + // /// Initialize query scope, once query_id is initialized. + // /// (To track as much allocations as possible) + // query_scope.emplace(context); /// NOTE: this may create pretty huge allocations that will not be accounted in trace_log, /// because memory_profiler_sample_probability/memory_profiler_step are not applied yet, @@ -431,8 +492,8 @@ void HTTPHandler::processQuery( const auto & query = getQuery(request, params, context); std::unique_ptr in_param = std::make_unique(query); - used_output.out_holder->setSendProgress(settings.send_progress_in_http_headers); - used_output.out_holder->setSendProgressInterval(settings.http_headers_progress_interval_ms); + // used_output.out_holder->setSendProgress(settings.send_progress_in_http_headers); + // used_output.out_holder->setSendProgressInterval(settings.http_headers_progress_interval_ms); /// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on, /// checksums of client data compressed with internal algorithm are not checked. @@ -442,8 +503,8 @@ void HTTPHandler::processQuery( /// Add CORS header if 'add_http_cors_header' setting is turned on send * in Access-Control-Allow-Origin /// Note that whether the header is added is determined by the settings, and we can only get the user settings after authentication. /// Once the authentication fails, the header can't be added. - if (settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response")) - used_output.out_holder->addHeaderCORS(true); + // if (settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response")) + // used_output.out_holder->addHeaderCORS(true); auto append_callback = [my_context = context] (ProgressCallback callback) { @@ -526,6 +587,10 @@ void HTTPHandler::processQuery( } }; + /// Start sending headers early, as it requires access to the internal + /// WriteBuffer, which may also be used concurrently by compressing wrapper. + used_output.out_holder->startSendHeadersPublic(); + executeQuery( *in, *used_output.out_maybe_delayed_and_compressed,