ClickHouse/src/Server/InterserverIOHTTPHandler.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

173 lines
5.1 KiB
C++
Raw Normal View History

#include <Server/InterserverIOHTTPHandler.h>
#include <Server/IServer.h>
2018-12-28 18:15:26 +00:00
#include <Compression/CompressedWriteBuffer.h>
#include <Core/ServerSettings.h>
#include <IO/ReadBufferFromIStream.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterserverIOHandler.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Util/LayeredConfiguration.h>
2014-03-21 13:42:14 +00:00
namespace DB
{
2016-01-12 02:21:15 +00:00
namespace ErrorCodes
{
extern const int ABORTED;
2018-03-09 23:23:15 +00:00
extern const int TOO_MANY_SIMULTANEOUS_QUERIES;
2016-01-12 02:21:15 +00:00
}
std::pair<String, bool> InterserverIOHTTPHandler::checkAuthentication(HTTPServerRequest & request) const
2014-03-21 13:42:14 +00:00
{
auto server_credentials = server.context()->getInterserverCredentials();
2021-04-06 13:56:14 +00:00
if (server_credentials)
{
if (!request.hasCredentials())
2021-04-07 13:59:18 +00:00
return server_credentials->isValidUser("", "");
2021-04-06 13:56:14 +00:00
String scheme, info;
request.getCredentials(scheme, info);
2021-04-06 13:56:14 +00:00
if (scheme != "Basic")
return {"Server requires HTTP Basic authentication but client provides another method", false};
2021-04-06 13:56:14 +00:00
Poco::Net::HTTPBasicCredentials credentials(info);
2021-04-07 13:59:18 +00:00
return server_credentials->isValidUser(credentials.getUsername(), credentials.getPassword());
2021-04-06 13:56:14 +00:00
}
else if (request.hasCredentials())
{
return {"Client requires HTTP Basic authentication, but server doesn't provide it", false};
}
return {"", true};
}
void InterserverIOHTTPHandler::processQuery(HTTPServerRequest & request, HTTPServerResponse & response, Output & used_output)
{
2021-06-16 14:33:14 +00:00
HTMLForm params(server.context()->getSettingsRef(), request);
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Request URI: {}", request.getURI());
2014-03-21 13:42:14 +00:00
String endpoint_name = params.get("endpoint");
bool compress = params.get("compress") == "true";
auto & body = request.getStream();
2016-03-01 17:47:53 +00:00
auto endpoint = server.context()->getInterserverIOHandler().getEndpoint(endpoint_name);
2020-01-14 14:27:48 +00:00
/// Locked for read while query processing
std::shared_lock lock(endpoint->rwlock);
if (endpoint->blocker.isCancelled())
throw Exception(ErrorCodes::ABORTED, "Transferring part to replica was cancelled");
2014-03-21 13:42:14 +00:00
if (compress)
{
2020-03-09 03:38:43 +00:00
CompressedWriteBuffer compressed_out(*used_output.out);
endpoint->processQuery(params, body, compressed_out, response);
2014-03-21 13:42:14 +00:00
}
else
{
2020-03-09 03:38:43 +00:00
endpoint->processQuery(params, body, *used_output.out, response);
2014-03-21 13:42:14 +00:00
}
}
void InterserverIOHTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event)
2014-03-21 13:42:14 +00:00
{
2018-08-31 00:59:48 +00:00
setThreadName("IntersrvHandler");
2017-03-09 04:26:17 +00:00
/// In order to work keep-alive.
if (request.getVersion() == HTTPServerRequest::HTTP_1_1)
2014-03-21 13:42:14 +00:00
response.setChunkedTransferEncoding(true);
Output used_output;
used_output.out = std::make_shared<WriteBufferFromHTTPServerResponse>(
2024-03-22 16:09:14 +00:00
response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, write_event);
2024-03-04 15:36:46 +00:00
auto finalize_output = [&]
{
try
{
used_output.out->finalize();
}
catch (...)
{
tryLogCurrentException(log, "Failed to finalize response write buffer");
}
};
auto write_response = [&](const std::string & message)
{
if (response.sent())
{
2024-03-04 15:36:46 +00:00
finalize_output();
return;
}
2023-04-24 17:07:30 +00:00
try
{
2024-03-04 15:36:46 +00:00
writeString(message, *used_output.out);
finalize_output();
}
catch (...)
{
tryLogCurrentException(log);
2024-03-04 15:36:46 +00:00
finalize_output();
}
};
2014-03-21 13:42:14 +00:00
try
{
2021-04-06 13:42:38 +00:00
if (auto [message, success] = checkAuthentication(request); success)
{
processQuery(request, response, used_output);
2024-03-04 15:36:46 +00:00
finalize_output();
2020-10-10 17:47:34 +00:00
LOG_DEBUG(log, "Done processing query");
}
else
{
response.setStatusAndReason(HTTPServerResponse::HTTP_UNAUTHORIZED);
write_response(message);
2020-05-23 22:24:01 +00:00
LOG_WARNING(log, "Query processing failed request: '{}' authentication failed", request.getURI());
}
2014-03-21 13:42:14 +00:00
}
catch (Exception & e)
{
2018-03-09 23:23:15 +00:00
if (e.code() == ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES)
{
used_output.out->finalize();
return;
}
2014-03-21 13:42:14 +00:00
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
2016-11-22 18:17:24 +00:00
/// Sending to remote server was cancelled due to server shutdown or drop table.
bool is_real_error = e.code() != ErrorCodes::ABORTED;
2023-01-16 23:11:59 +00:00
PreformattedMessage message = getCurrentExceptionMessageAndPattern(is_real_error);
2023-01-25 20:16:42 +00:00
write_response(message.text);
2016-11-22 18:17:24 +00:00
if (is_real_error)
2023-01-16 23:11:59 +00:00
LOG_ERROR(log, message);
else
2023-01-16 23:11:59 +00:00
LOG_INFO(log, message);
2014-03-21 13:42:14 +00:00
}
catch (...)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
2023-01-16 23:11:59 +00:00
PreformattedMessage message = getCurrentExceptionMessageAndPattern(/* with_stacktrace */ false);
2023-01-25 20:16:42 +00:00
write_response(message.text);
2023-01-16 23:11:59 +00:00
LOG_ERROR(log, message);
2014-03-21 13:42:14 +00:00
}
}
}