mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge c40dae940e
into 733c57dae7
This commit is contained in:
commit
e81800232f
@ -1,20 +1,27 @@
|
||||
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
|
||||
|
||||
#include <IO/HTTPCommon.h>
|
||||
#include <IO/Progress.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
void WriteBufferFromHTTPServerResponse::startSendHeadersPublic()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
startSendHeaders();
|
||||
}
|
||||
|
||||
void WriteBufferFromHTTPServerResponse::startSendHeaders()
|
||||
{
|
||||
if (!headers_started_sending)
|
||||
{
|
||||
if (headers_started_sending)
|
||||
return;
|
||||
|
||||
headers_started_sending = true;
|
||||
|
||||
if (response.getChunkedTransferEncoding())
|
||||
@ -36,7 +43,6 @@ void WriteBufferFromHTTPServerResponse::startSendHeaders()
|
||||
response.beginWrite(header);
|
||||
auto header_str = header.str();
|
||||
socketSendBytes(header_str.data(), header_str.size());
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferFromHTTPServerResponse::writeHeaderProgressImpl(const char * header_name)
|
||||
@ -68,6 +74,7 @@ void WriteBufferFromHTTPServerResponse::writeExceptionCode()
|
||||
{
|
||||
if (headers_finished_sending || !exception_code)
|
||||
return;
|
||||
|
||||
if (headers_started_sending)
|
||||
{
|
||||
socketSendBytes("X-ClickHouse-Exception-Code: ", sizeof("X-ClickHouse-Exception-Code: ") - 1);
|
||||
@ -129,10 +136,18 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
|
||||
WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse(
|
||||
HTTPServerResponse & response_,
|
||||
bool is_http_method_head_,
|
||||
const ProfileEvents::Event & write_event_)
|
||||
const ProfileEvents::Event & write_event_,
|
||||
const CompressionMethod & compression_method_,
|
||||
bool add_cors_header_,
|
||||
bool send_progress_,
|
||||
size_t send_progress_interval_ms_)
|
||||
: HTTPWriteBuffer(response_.getSocket(), write_event_)
|
||||
, response(response_)
|
||||
, is_http_method_head(is_http_method_head_)
|
||||
, add_cors_header(add_cors_header_)
|
||||
, send_progress(send_progress_)
|
||||
, send_progress_interval_ms(send_progress_interval_ms_)
|
||||
, compression_method(compression_method_)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -11,7 +11,6 @@
|
||||
#include <Common/Stopwatch.h>
|
||||
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -29,7 +28,11 @@ public:
|
||||
WriteBufferFromHTTPServerResponse(
|
||||
HTTPServerResponse & response_,
|
||||
bool is_http_method_head_,
|
||||
const ProfileEvents::Event & write_event_ = ProfileEvents::end());
|
||||
const ProfileEvents::Event & write_event_ = ProfileEvents::end(),
|
||||
const CompressionMethod & compression_method_ = CompressionMethod::None,
|
||||
bool add_cors_header_ = false,
|
||||
bool send_progress_ = false,
|
||||
size_t send_progress_interval_ms_ = 100);
|
||||
|
||||
~WriteBufferFromHTTPServerResponse() override;
|
||||
|
||||
@ -60,6 +63,9 @@ public:
|
||||
|
||||
void setExceptionCode(int exception_code_);
|
||||
|
||||
/// FIXME
|
||||
void startSendHeadersPublic();
|
||||
|
||||
private:
|
||||
/// Send at least HTTP headers if no data has been sent yet.
|
||||
/// Use after the data has possibly been sent and no error happened (and thus you do not plan
|
||||
@ -89,19 +95,19 @@ private:
|
||||
HTTPServerResponse & response;
|
||||
|
||||
bool is_http_method_head;
|
||||
bool add_cors_header = false;
|
||||
bool add_cors_header;
|
||||
|
||||
bool initialized = false;
|
||||
|
||||
bool headers_started_sending = false;
|
||||
bool headers_finished_sending = false; /// If true, you could not add any headers.
|
||||
std::atomic_bool headers_started_sending = false;
|
||||
std::atomic_bool headers_finished_sending = false; /// If true, you could not add any headers.
|
||||
|
||||
Progress accumulated_progress;
|
||||
bool send_progress = false;
|
||||
size_t send_progress_interval_ms = 100;
|
||||
bool send_progress;
|
||||
size_t send_progress_interval_ms;
|
||||
Stopwatch progress_watch;
|
||||
|
||||
CompressionMethod compression_method = CompressionMethod::None;
|
||||
CompressionMethod compression_method;
|
||||
|
||||
int exception_code = 0;
|
||||
|
||||
|
@ -261,18 +261,79 @@ void HTTPHandler::processQuery(
|
||||
|
||||
bool enable_http_compression = params.getParsed<bool>("enable_http_compression", context->getSettingsRef().enable_http_compression);
|
||||
Int64 http_zlib_compression_level = params.getParsed<Int64>("http_zlib_compression_level", context->getSettingsRef().http_zlib_compression_level);
|
||||
auto chosen_compression_method = enable_http_compression ? http_response_compression_method : CompressionMethod::None;
|
||||
|
||||
bool has_external_data = startsWith(request.getContentType(), "multipart/form-data");
|
||||
auto param_could_be_skipped = [&] (const String & name)
|
||||
{
|
||||
/// Empty parameter appears when URL like ?&a=b or a=b&&c=d. Just skip them for user's convenience.
|
||||
if (name.empty())
|
||||
return true;
|
||||
|
||||
/// Some parameters (database, default_format, everything used in the code above) do not
|
||||
/// belong to the Settings class.
|
||||
static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "role",
|
||||
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session",
|
||||
"database", "default_format"};
|
||||
|
||||
if (reserved_param_names.contains(name))
|
||||
return true;
|
||||
|
||||
/// For external data we also want settings.
|
||||
if (has_external_data)
|
||||
{
|
||||
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
|
||||
/// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
|
||||
static const Names reserved_param_suffixes = {"_format", "_types", "_structure"};
|
||||
for (const String & suffix : reserved_param_suffixes)
|
||||
{
|
||||
if (endsWith(name, suffix))
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
/// Settings can be overridden in the query.
|
||||
SettingsChanges settings_changes;
|
||||
for (const auto & [key, value] : params)
|
||||
{
|
||||
if (!param_could_be_skipped(key))
|
||||
{
|
||||
/// Other than query parameters are treated as settings.
|
||||
if (!customizeQueryParam(context, key, value))
|
||||
settings_changes.push_back({key, value});
|
||||
}
|
||||
}
|
||||
|
||||
context->checkSettingsConstraints(settings_changes, SettingSource::QUERY);
|
||||
context->applySettingsChanges(settings_changes);
|
||||
const auto & settings = context->getSettingsRef();
|
||||
|
||||
/// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields.
|
||||
context->setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", "")));
|
||||
|
||||
/// Initialize query scope, once query_id is initialized.
|
||||
/// (To track as much allocations as possible)
|
||||
query_scope.emplace(context);
|
||||
|
||||
bool enable_cors = settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response");
|
||||
|
||||
used_output.out_holder =
|
||||
std::make_shared<WriteBufferFromHTTPServerResponse>(
|
||||
response,
|
||||
request.getMethod() == HTTPRequest::HTTP_HEAD,
|
||||
write_event);
|
||||
write_event,
|
||||
chosen_compression_method,
|
||||
enable_cors,
|
||||
settings.send_progress_in_http_headers,
|
||||
settings.http_headers_progress_interval_ms);
|
||||
|
||||
used_output.out = used_output.out_holder;
|
||||
used_output.out_maybe_compressed = used_output.out_holder;
|
||||
|
||||
if (client_supports_http_compression && enable_http_compression)
|
||||
{
|
||||
used_output.out_holder->setCompressionMethodHeader(http_response_compression_method);
|
||||
used_output.wrap_compressed_holder =
|
||||
wrapWriteBufferWithCompressionMethod(
|
||||
used_output.out_holder.get(),
|
||||
@ -369,61 +430,61 @@ void HTTPHandler::processQuery(
|
||||
/// Anything else beside HTTP POST should be readonly queries.
|
||||
setReadOnlyIfHTTPMethodIdempotent(context, request.getMethod());
|
||||
|
||||
bool has_external_data = startsWith(request.getContentType(), "multipart/form-data");
|
||||
|
||||
auto param_could_be_skipped = [&] (const String & name)
|
||||
{
|
||||
/// Empty parameter appears when URL like ?&a=b or a=b&&c=d. Just skip them for user's convenience.
|
||||
if (name.empty())
|
||||
return true;
|
||||
|
||||
/// Some parameters (database, default_format, everything used in the code above) do not
|
||||
/// belong to the Settings class.
|
||||
static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "role",
|
||||
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session",
|
||||
"database", "default_format"};
|
||||
|
||||
if (reserved_param_names.contains(name))
|
||||
return true;
|
||||
|
||||
/// For external data we also want settings.
|
||||
if (has_external_data)
|
||||
{
|
||||
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
|
||||
/// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
|
||||
static const Names reserved_param_suffixes = {"_format", "_types", "_structure"};
|
||||
for (const String & suffix : reserved_param_suffixes)
|
||||
{
|
||||
if (endsWith(name, suffix))
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
/// Settings can be overridden in the query.
|
||||
SettingsChanges settings_changes;
|
||||
for (const auto & [key, value] : params)
|
||||
{
|
||||
if (!param_could_be_skipped(key))
|
||||
{
|
||||
/// Other than query parameters are treated as settings.
|
||||
if (!customizeQueryParam(context, key, value))
|
||||
settings_changes.push_back({key, value});
|
||||
}
|
||||
}
|
||||
|
||||
context->checkSettingsConstraints(settings_changes, SettingSource::QUERY);
|
||||
context->applySettingsChanges(settings_changes);
|
||||
const auto & settings = context->getSettingsRef();
|
||||
|
||||
/// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields.
|
||||
context->setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", "")));
|
||||
|
||||
/// Initialize query scope, once query_id is initialized.
|
||||
/// (To track as much allocations as possible)
|
||||
query_scope.emplace(context);
|
||||
// bool has_external_data = startsWith(request.getContentType(), "multipart/form-data");
|
||||
//
|
||||
// auto param_could_be_skipped = [&] (const String & name)
|
||||
// {
|
||||
// /// Empty parameter appears when URL like ?&a=b or a=b&&c=d. Just skip them for user's convenience.
|
||||
// if (name.empty())
|
||||
// return true;
|
||||
//
|
||||
// /// Some parameters (database, default_format, everything used in the code above) do not
|
||||
// /// belong to the Settings class.
|
||||
// static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "role",
|
||||
// "buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session",
|
||||
// "database", "default_format"};
|
||||
//
|
||||
// if (reserved_param_names.contains(name))
|
||||
// return true;
|
||||
//
|
||||
// /// For external data we also want settings.
|
||||
// if (has_external_data)
|
||||
// {
|
||||
// /// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
|
||||
// /// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
|
||||
// static const Names reserved_param_suffixes = {"_format", "_types", "_structure"};
|
||||
// for (const String & suffix : reserved_param_suffixes)
|
||||
// {
|
||||
// if (endsWith(name, suffix))
|
||||
// return true;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return false;
|
||||
// };
|
||||
//
|
||||
// /// Settings can be overridden in the query.
|
||||
// SettingsChanges settings_changes;
|
||||
// for (const auto & [key, value] : params)
|
||||
// {
|
||||
// if (!param_could_be_skipped(key))
|
||||
// {
|
||||
// /// Other than query parameters are treated as settings.
|
||||
// if (!customizeQueryParam(context, key, value))
|
||||
// settings_changes.push_back({key, value});
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// context->checkSettingsConstraints(settings_changes, SettingSource::QUERY);
|
||||
// context->applySettingsChanges(settings_changes);
|
||||
// const auto & settings = context->getSettingsRef();
|
||||
//
|
||||
// /// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields.
|
||||
// context->setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", "")));
|
||||
//
|
||||
// /// Initialize query scope, once query_id is initialized.
|
||||
// /// (To track as much allocations as possible)
|
||||
// query_scope.emplace(context);
|
||||
|
||||
/// NOTE: this may create pretty huge allocations that will not be accounted in trace_log,
|
||||
/// because memory_profiler_sample_probability/memory_profiler_step are not applied yet,
|
||||
@ -431,8 +492,8 @@ void HTTPHandler::processQuery(
|
||||
const auto & query = getQuery(request, params, context);
|
||||
std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query);
|
||||
|
||||
used_output.out_holder->setSendProgress(settings.send_progress_in_http_headers);
|
||||
used_output.out_holder->setSendProgressInterval(settings.http_headers_progress_interval_ms);
|
||||
// used_output.out_holder->setSendProgress(settings.send_progress_in_http_headers);
|
||||
// used_output.out_holder->setSendProgressInterval(settings.http_headers_progress_interval_ms);
|
||||
|
||||
/// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on,
|
||||
/// checksums of client data compressed with internal algorithm are not checked.
|
||||
@ -442,8 +503,8 @@ void HTTPHandler::processQuery(
|
||||
/// Add CORS header if 'add_http_cors_header' setting is turned on send * in Access-Control-Allow-Origin
|
||||
/// Note that whether the header is added is determined by the settings, and we can only get the user settings after authentication.
|
||||
/// Once the authentication fails, the header can't be added.
|
||||
if (settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response"))
|
||||
used_output.out_holder->addHeaderCORS(true);
|
||||
// if (settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response"))
|
||||
// used_output.out_holder->addHeaderCORS(true);
|
||||
|
||||
auto append_callback = [my_context = context] (ProgressCallback callback)
|
||||
{
|
||||
@ -481,7 +542,7 @@ void HTTPHandler::processQuery(
|
||||
|
||||
applyHTTPResponseHeaders(response, http_response_headers_override);
|
||||
|
||||
auto set_query_result = [&response, this] (const QueryResultDetails & details)
|
||||
auto set_query_result = [&response, &used_output, this] (const QueryResultDetails & details)
|
||||
{
|
||||
response.add("X-ClickHouse-Query-Id", details.query_id);
|
||||
|
||||
@ -494,6 +555,10 @@ void HTTPHandler::processQuery(
|
||||
|
||||
if (details.timezone)
|
||||
response.add("X-ClickHouse-Timezone", *details.timezone);
|
||||
|
||||
/// Start sending headers early, as it requires access to the internal
|
||||
/// WriteBuffer, which may also be used concurrently by compressing wrapper.
|
||||
used_output.out_holder->startSendHeadersPublic();
|
||||
};
|
||||
|
||||
auto handle_exception_in_output_format = [&](IOutputFormat & current_output_format, const String & format_name, const ContextPtr & context_, const std::optional<FormatSettings> & format_settings)
|
||||
|
Loading…
Reference in New Issue
Block a user