temp-commit

This commit is contained in:
Konstantin Bogdanov 2024-09-13 17:58:29 +02:00
parent 2fce90ab76
commit e3dd7b6668
No known key found for this signature in database
3 changed files with 181 additions and 95 deletions

View File

@ -1,20 +1,27 @@
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/HTTPCommon.h>
#include <IO/Progress.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <memory>
#include <sstream>
#include <string>
namespace DB
{
void WriteBufferFromHTTPServerResponse::startSendHeadersPublic()
{
std::lock_guard lock(mutex);
startSendHeaders();
}
void WriteBufferFromHTTPServerResponse::startSendHeaders()
{
if (!headers_started_sending)
{
if (headers_started_sending)
return;
headers_started_sending = true;
if (response.getChunkedTransferEncoding())
@ -36,7 +43,6 @@ void WriteBufferFromHTTPServerResponse::startSendHeaders()
response.beginWrite(header);
auto header_str = header.str();
socketSendBytes(header_str.data(), header_str.size());
}
}
void WriteBufferFromHTTPServerResponse::writeHeaderProgressImpl(const char * header_name)
@ -68,6 +74,7 @@ void WriteBufferFromHTTPServerResponse::writeExceptionCode()
{
if (headers_finished_sending || !exception_code)
return;
if (headers_started_sending)
{
socketSendBytes("X-ClickHouse-Exception-Code: ", sizeof("X-ClickHouse-Exception-Code: ") - 1);
@ -129,10 +136,18 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse(
HTTPServerResponse & response_,
bool is_http_method_head_,
const ProfileEvents::Event & write_event_)
const ProfileEvents::Event & write_event_,
const CompressionMethod & compression_method_,
bool add_cors_header_,
bool send_progress_,
size_t send_progress_interval_ms_)
: HTTPWriteBuffer(response_.getSocket(), write_event_)
, response(response_)
, is_http_method_head(is_http_method_head_)
, add_cors_header(add_cors_header_)
, send_progress(send_progress_)
, send_progress_interval_ms(send_progress_interval_ms_)
, compression_method(compression_method_)
{
}

View File

@ -11,7 +11,6 @@
#include <Common/Stopwatch.h>
#include <mutex>
#include <optional>
namespace DB
@ -29,7 +28,11 @@ public:
WriteBufferFromHTTPServerResponse(
HTTPServerResponse & response_,
bool is_http_method_head_,
const ProfileEvents::Event & write_event_ = ProfileEvents::end());
const ProfileEvents::Event & write_event_ = ProfileEvents::end(),
const CompressionMethod & compression_method_ = CompressionMethod::None,
bool add_cors_header_ = false,
bool send_progress_ = false,
size_t send_progress_interval_ms_ = 100);
~WriteBufferFromHTTPServerResponse() override;
@ -60,6 +63,9 @@ public:
void setExceptionCode(int exception_code_);
/// FIXME
void startSendHeadersPublic();
private:
/// Send at least HTTP headers if no data has been sent yet.
/// Use after the data has possibly been sent and no error happened (and thus you do not plan
@ -89,19 +95,19 @@ private:
HTTPServerResponse & response;
bool is_http_method_head;
bool add_cors_header = false;
bool add_cors_header;
bool initialized = false;
bool headers_started_sending = false;
bool headers_finished_sending = false; /// If true, you could not add any headers.
std::atomic_bool headers_started_sending = false;
std::atomic_bool headers_finished_sending = false; /// If true, you could not add any headers.
Progress accumulated_progress;
bool send_progress = false;
size_t send_progress_interval_ms = 100;
bool send_progress;
size_t send_progress_interval_ms;
Stopwatch progress_watch;
CompressionMethod compression_method = CompressionMethod::None;
CompressionMethod compression_method;
int exception_code = 0;

View File

@ -261,18 +261,79 @@ void HTTPHandler::processQuery(
bool enable_http_compression = params.getParsed<bool>("enable_http_compression", context->getSettingsRef().enable_http_compression);
Int64 http_zlib_compression_level = params.getParsed<Int64>("http_zlib_compression_level", context->getSettingsRef().http_zlib_compression_level);
auto chosen_compression_method = enable_http_compression ? http_response_compression_method : CompressionMethod::None;
bool has_external_data = startsWith(request.getContentType(), "multipart/form-data");
auto param_could_be_skipped = [&] (const String & name)
{
/// Empty parameter appears when URL like ?&a=b or a=b&&c=d. Just skip them for user's convenience.
if (name.empty())
return true;
/// Some parameters (database, default_format, everything used in the code above) do not
/// belong to the Settings class.
static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "role",
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session",
"database", "default_format"};
if (reserved_param_names.contains(name))
return true;
/// For external data we also want settings.
if (has_external_data)
{
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
/// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
static const Names reserved_param_suffixes = {"_format", "_types", "_structure"};
for (const String & suffix : reserved_param_suffixes)
{
if (endsWith(name, suffix))
return true;
}
}
return false;
};
/// Settings can be overridden in the query.
SettingsChanges settings_changes;
for (const auto & [key, value] : params)
{
if (!param_could_be_skipped(key))
{
/// Other than query parameters are treated as settings.
if (!customizeQueryParam(context, key, value))
settings_changes.push_back({key, value});
}
}
context->checkSettingsConstraints(settings_changes, SettingSource::QUERY);
context->applySettingsChanges(settings_changes);
const auto & settings = context->getSettingsRef();
/// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields.
context->setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", "")));
/// Initialize query scope, once query_id is initialized.
/// (To track as much allocations as possible)
query_scope.emplace(context);
bool enable_cors = settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response");
used_output.out_holder =
std::make_shared<WriteBufferFromHTTPServerResponse>(
response,
request.getMethod() == HTTPRequest::HTTP_HEAD,
write_event);
write_event,
chosen_compression_method,
enable_cors,
settings.send_progress_in_http_headers,
settings.http_headers_progress_interval_ms);
used_output.out = used_output.out_holder;
used_output.out_maybe_compressed = used_output.out_holder;
if (client_supports_http_compression && enable_http_compression)
{
used_output.out_holder->setCompressionMethodHeader(http_response_compression_method);
used_output.wrap_compressed_holder =
wrapWriteBufferWithCompressionMethod(
used_output.out_holder.get(),
@ -369,61 +430,61 @@ void HTTPHandler::processQuery(
/// Anything else beside HTTP POST should be readonly queries.
setReadOnlyIfHTTPMethodIdempotent(context, request.getMethod());
bool has_external_data = startsWith(request.getContentType(), "multipart/form-data");
auto param_could_be_skipped = [&] (const String & name)
{
/// Empty parameter appears when URL like ?&a=b or a=b&&c=d. Just skip them for user's convenience.
if (name.empty())
return true;
/// Some parameters (database, default_format, everything used in the code above) do not
/// belong to the Settings class.
static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "role",
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session",
"database", "default_format"};
if (reserved_param_names.contains(name))
return true;
/// For external data we also want settings.
if (has_external_data)
{
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
/// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
static const Names reserved_param_suffixes = {"_format", "_types", "_structure"};
for (const String & suffix : reserved_param_suffixes)
{
if (endsWith(name, suffix))
return true;
}
}
return false;
};
/// Settings can be overridden in the query.
SettingsChanges settings_changes;
for (const auto & [key, value] : params)
{
if (!param_could_be_skipped(key))
{
/// Other than query parameters are treated as settings.
if (!customizeQueryParam(context, key, value))
settings_changes.push_back({key, value});
}
}
context->checkSettingsConstraints(settings_changes, SettingSource::QUERY);
context->applySettingsChanges(settings_changes);
const auto & settings = context->getSettingsRef();
/// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields.
context->setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", "")));
/// Initialize query scope, once query_id is initialized.
/// (To track as much allocations as possible)
query_scope.emplace(context);
// bool has_external_data = startsWith(request.getContentType(), "multipart/form-data");
//
// auto param_could_be_skipped = [&] (const String & name)
// {
// /// Empty parameter appears when URL like ?&a=b or a=b&&c=d. Just skip them for user's convenience.
// if (name.empty())
// return true;
//
// /// Some parameters (database, default_format, everything used in the code above) do not
// /// belong to the Settings class.
// static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "role",
// "buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session",
// "database", "default_format"};
//
// if (reserved_param_names.contains(name))
// return true;
//
// /// For external data we also want settings.
// if (has_external_data)
// {
// /// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
// /// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
// static const Names reserved_param_suffixes = {"_format", "_types", "_structure"};
// for (const String & suffix : reserved_param_suffixes)
// {
// if (endsWith(name, suffix))
// return true;
// }
// }
//
// return false;
// };
//
// /// Settings can be overridden in the query.
// SettingsChanges settings_changes;
// for (const auto & [key, value] : params)
// {
// if (!param_could_be_skipped(key))
// {
// /// Other than query parameters are treated as settings.
// if (!customizeQueryParam(context, key, value))
// settings_changes.push_back({key, value});
// }
// }
//
// context->checkSettingsConstraints(settings_changes, SettingSource::QUERY);
// context->applySettingsChanges(settings_changes);
// const auto & settings = context->getSettingsRef();
//
// /// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields.
// context->setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", "")));
//
// /// Initialize query scope, once query_id is initialized.
// /// (To track as much allocations as possible)
// query_scope.emplace(context);
/// NOTE: this may create pretty huge allocations that will not be accounted in trace_log,
/// because memory_profiler_sample_probability/memory_profiler_step are not applied yet,
@ -431,8 +492,8 @@ void HTTPHandler::processQuery(
const auto & query = getQuery(request, params, context);
std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query);
used_output.out_holder->setSendProgress(settings.send_progress_in_http_headers);
used_output.out_holder->setSendProgressInterval(settings.http_headers_progress_interval_ms);
// used_output.out_holder->setSendProgress(settings.send_progress_in_http_headers);
// used_output.out_holder->setSendProgressInterval(settings.http_headers_progress_interval_ms);
/// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on,
/// checksums of client data compressed with internal algorithm are not checked.
@ -442,8 +503,8 @@ void HTTPHandler::processQuery(
/// Add CORS header if 'add_http_cors_header' setting is turned on send * in Access-Control-Allow-Origin
/// Note that whether the header is added is determined by the settings, and we can only get the user settings after authentication.
/// Once the authentication fails, the header can't be added.
if (settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response"))
used_output.out_holder->addHeaderCORS(true);
// if (settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response"))
// used_output.out_holder->addHeaderCORS(true);
auto append_callback = [my_context = context] (ProgressCallback callback)
{
@ -526,6 +587,10 @@ void HTTPHandler::processQuery(
}
};
/// Start sending headers early, as it requires access to the internal
/// WriteBuffer, which may also be used concurrently by compressing wrapper.
used_output.out_holder->startSendHeadersPublic();
executeQuery(
*in,
*used_output.out_maybe_delayed_and_compressed,