diff --git a/dbms/src/DataStreams/JSONRowOutputStream.cpp b/dbms/src/DataStreams/JSONRowOutputStream.cpp index 6794ebb52d6..9c43a8fa546 100644 --- a/dbms/src/DataStreams/JSONRowOutputStream.cpp +++ b/dbms/src/DataStreams/JSONRowOutputStream.cpp @@ -44,8 +44,6 @@ void JSONRowOutputStream::writePrefix() writeChar('\n', ostr); writeCString("\t\"data\":\n", ostr); writeCString("\t[\n", ostr); - - ostr.next(); } diff --git a/dbms/src/Server/HTTPHandler.cpp b/dbms/src/Server/HTTPHandler.cpp index 9f4f3690bb4..96434d0777e 100644 --- a/dbms/src/Server/HTTPHandler.cpp +++ b/dbms/src/Server/HTTPHandler.cpp @@ -11,7 +11,6 @@ #include #include #include -#include #include #include @@ -28,7 +27,7 @@ namespace DB { -void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) +void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, Output & used_output) { LOG_TRACE(log, "Request URI: " << request.getURI()); @@ -47,13 +46,12 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net query_param += '\n'; /// Если указано compress, то будем сжимать результат. - SharedPtr out = new WriteBufferFromHTTPServerResponse(response); - SharedPtr out_maybe_compressed; + used_output.out = new WriteBufferFromHTTPServerResponse(response); if (parse(params.get("compress", "0"))) - out_maybe_compressed = new CompressedWriteBuffer(*out); + used_output.out_maybe_compressed = new CompressedWriteBuffer(*used_output.out); else - out_maybe_compressed = out; + used_output.out_maybe_compressed = used_output.out; /// Имя пользователя и пароль могут быть заданы как в параметрах URL, так и с помощью HTTP Basic authentification (и то, и другое не секъюрно). std::string user = params.get("user", "default"); @@ -138,7 +136,7 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net context.getSettingsRef().limits.readonly = true; Stopwatch watch; - executeQuery(*in, *out_maybe_compressed, context, query_plan); + executeQuery(*in, *used_output.out_maybe_compressed, context, query_plan); watch.stop(); if (query_plan) @@ -167,11 +165,13 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net LOG_INFO(log, "Quota:\n" << quota.toString()); /// Если не было эксепшена и данные ещё не отправлены - отправляются HTTP заголовки с кодом 200. - out->finalize(); + used_output.out->finalize(); } -void HTTPHandler::trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) +void HTTPHandler::trySendExceptionToClient(std::stringstream & s, + Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, + Output & used_output) { try { @@ -186,8 +186,32 @@ void HTTPHandler::trySendExceptionToClient(std::stringstream & s, Poco::Net::HTT } response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); - if (!response.sent()) + + if (!response.sent() && !used_output.out_maybe_compressed) + { + /// Ещё ничего не отправляли, и даже не знаем, нужно ли сжимать ответ. response.send() << s.str() << std::endl; + } + else if (used_output.out_maybe_compressed) + { + /** Отправим в использованный (возможно сжатый) поток сообщение об ошибке. + * Сообщение об ошибке может идти невпопад - после каких-то данных. + * Также стоит иметь ввиду, что мы могли уже отправить код 200. + */ + + /** Если данные есть в буфере, но их ещё не отправили, то и не будем отправлять */ + if (used_output.out->count() - used_output.out->offset() == 0) + { + used_output.out_maybe_compressed->position() = used_output.out_maybe_compressed->buffer().begin(); + used_output.out->position() = used_output.out->buffer().begin(); + } + + std::string exception_message = s.str(); + writeString(exception_message, *used_output.out_maybe_compressed); + writeChar('\n', *used_output.out_maybe_compressed); + used_output.out_maybe_compressed->next(); + used_output.out->finalize(); + } } catch (...) { @@ -198,6 +222,8 @@ void HTTPHandler::trySendExceptionToClient(std::stringstream & s, Poco::Net::HTT void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) { + Output used_output; + try { bool is_browser = false; @@ -215,7 +241,7 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1) response.setChunkedTransferEncoding(true); - processQuery(request, response); + processQuery(request, response, used_output); LOG_INFO(log, "Done processing query"); } catch (Exception & e) @@ -224,26 +250,26 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne s << "Code: " << e.code() << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what(); LOG_ERROR(log, s.str()); - trySendExceptionToClient(s, request, response); + trySendExceptionToClient(s, request, response, used_output); } catch (Poco::Exception & e) { std::stringstream s; s << "Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code() << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what(); - trySendExceptionToClient(s, request, response); + trySendExceptionToClient(s, request, response, used_output); } catch (std::exception & e) { std::stringstream s; s << "Code: " << ErrorCodes::STD_EXCEPTION << ". " << e.what(); - trySendExceptionToClient(s, request, response); + trySendExceptionToClient(s, request, response, used_output); } catch (...) { std::stringstream s; s << "Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ". Unknown exception."; - trySendExceptionToClient(s, request, response); + trySendExceptionToClient(s, request, response, used_output); } } diff --git a/dbms/src/Server/HTTPHandler.h b/dbms/src/Server/HTTPHandler.h index 03c917e57c3..7357c7f5831 100644 --- a/dbms/src/Server/HTTPHandler.h +++ b/dbms/src/Server/HTTPHandler.h @@ -1,5 +1,6 @@ #pragma once +#include #include "Server.h" @@ -16,15 +17,26 @@ public: { } + struct Output + { + SharedPtr out; + /// Используется для выдачи ответа. Равен либо out, либо CompressedWriteBuffer(*out), в зависимости от настроек. + SharedPtr out_maybe_compressed; + }; + void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response); - void trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response); + + void trySendExceptionToClient(std::stringstream & s, + Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, + Output & used_output); private: Server & server; Logger * log; - void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response); + /// Функция также инициализирует used_output. + void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, Output & used_output); }; } diff --git a/dbms/src/Storages/StorageFactory.cpp b/dbms/src/Storages/StorageFactory.cpp index 7d8a1507e7b..c9d0fb6f809 100644 --- a/dbms/src/Storages/StorageFactory.cpp +++ b/dbms/src/Storages/StorageFactory.cpp @@ -189,7 +189,7 @@ StoragePtr StorageFactory::get( } else if (endsWith(name, "MergeTree")) { - /** Движки [Replicated][Summing|Collapsing]MergeTree (6 комбинаций) + /** Движки [Replicated][Summing|Collapsing|Aggregating|]MergeTree (8 комбинаций) * В качестве аргумента для движка должно быть указано: * - (для Replicated) Путь к таблице в ZooKeeper * - (для Replicated) Имя реплики в ZooKeeper