#include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int ABORTED; extern const int NOT_IMPLEMENTED; extern const int TOO_MANY_SIMULTANEOUS_QUERIES; extern const int WRONG_PASSWORD; } std::pair InterserverIOHTTPHandler::checkAuthentication(HTTPServerRequest & request) const { auto creds = server.context().getInterserverCredential(); if (!request.hasCredentials()) return creds->isValidUser(std::make_pair(default_user, default_password)); String scheme, info; request.getCredentials(scheme, info); if (scheme != "Basic") throw Exception("Server requires HTTP Basic authentication but client provides another method", ErrorCodes::NOT_IMPLEMENTED); Poco::Net::HTTPBasicCredentials credentials(info); return creds->isValidUser(std::make_pair(credentials.getUsername(), credentials.getPassword())); } void InterserverIOHTTPHandler::processQuery(HTTPServerRequest & request, HTTPServerResponse & response, Output & used_output) { HTMLForm params(request); LOG_TRACE(log, "Request URI: {}", request.getURI()); String endpoint_name = params.get("endpoint"); bool compress = params.get("compress") == "true"; auto & body = request.getStream(); auto endpoint = server.context().getInterserverIOHandler().getEndpoint(endpoint_name); /// Locked for read while query processing std::shared_lock lock(endpoint->rwlock); if (endpoint->blocker.isCancelled()) throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED); if (compress) { CompressedWriteBuffer compressed_out(*used_output.out); endpoint->processQuery(params, body, compressed_out, response); } else { endpoint->processQuery(params, body, *used_output.out, response); } } void InterserverIOHTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) { setThreadName("IntersrvHandler"); /// In order to work keep-alive. if (request.getVersion() == HTTPServerRequest::HTTP_1_1) response.setChunkedTransferEncoding(true); Output used_output; const auto & config = server.config(); unsigned keep_alive_timeout = config.getUInt("keep_alive_timeout", 10); used_output.out = std::make_shared( response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout); auto write_response = [&](const std::string & message) { if (response.sent()) return; auto & out = *used_output.out; try { writeString(message, out); out.finalize(); } catch (...) { tryLogCurrentException(log); out.finalize(); } }; try { if (checkAuthentication(request)) { processQuery(request, response, used_output); used_output.out->finalize(); LOG_DEBUG(log, "Done processing query"); } else { response.setStatusAndReason(HTTPServerResponse::HTTP_UNAUTHORIZED); write_response(message); LOG_WARNING(log, "Query processing failed request: '{}' authentication failed", request.getURI()); } } catch (Exception & e) { if (e.code() == ErrorCodes::WRONG_PASSWORD) { response.setStatusAndReason(Poco::Net::HTTPServerResponse::HTTP_UNAUTHORIZED); if (!response.sent()) writeString("Unauthorized.", *used_output.out); LOG_WARNING(log, "Query processing failed request: '{}' authentication failed", request.getURI()); return; } if (e.code() == ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES) return; response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); /// Sending to remote server was cancelled due to server shutdown or drop table. bool is_real_error = e.code() != ErrorCodes::ABORTED; std::string message = getCurrentExceptionMessage(is_real_error); write_response(message); if (is_real_error) LOG_ERROR(log, message); else LOG_INFO(log, message); } catch (...) { response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); std::string message = getCurrentExceptionMessage(false); write_response(message); LOG_ERROR(log, message); } } }