This commit is contained in:
Evgeniy Gatov 2014-08-05 12:49:35 +04:00
commit 34550c10c0
4 changed files with 56 additions and 20 deletions

View File

@ -44,8 +44,6 @@ void JSONRowOutputStream::writePrefix()
writeChar('\n', ostr);
writeCString("\t\"data\":\n", ostr);
writeCString("\t[\n", ostr);
ostr.next();
}

View File

@ -11,7 +11,6 @@
#include <DB/IO/ConcatReadBuffer.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/IO/WriteBufferFromHTTPServerResponse.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteHelpers.h>
@ -28,7 +27,7 @@
namespace DB
{
void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, Output & used_output)
{
LOG_TRACE(log, "Request URI: " << request.getURI());
@ -47,13 +46,12 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
query_param += '\n';
/// Если указано compress, то будем сжимать результат.
SharedPtr<WriteBufferFromHTTPServerResponse> out = new WriteBufferFromHTTPServerResponse(response);
SharedPtr<WriteBuffer> out_maybe_compressed;
used_output.out = new WriteBufferFromHTTPServerResponse(response);
if (parse<bool>(params.get("compress", "0")))
out_maybe_compressed = new CompressedWriteBuffer(*out);
used_output.out_maybe_compressed = new CompressedWriteBuffer(*used_output.out);
else
out_maybe_compressed = out;
used_output.out_maybe_compressed = used_output.out;
/// Имя пользователя и пароль могут быть заданы как в параметрах URL, так и с помощью HTTP Basic authentification (и то, и другое не секъюрно).
std::string user = params.get("user", "default");
@ -138,7 +136,7 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
context.getSettingsRef().limits.readonly = true;
Stopwatch watch;
executeQuery(*in, *out_maybe_compressed, context, query_plan);
executeQuery(*in, *used_output.out_maybe_compressed, context, query_plan);
watch.stop();
if (query_plan)
@ -167,11 +165,13 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
LOG_INFO(log, "Quota:\n" << quota.toString());
/// Если не было эксепшена и данные ещё не отправлены - отправляются HTTP заголовки с кодом 200.
out->finalize();
used_output.out->finalize();
}
void HTTPHandler::trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
void HTTPHandler::trySendExceptionToClient(std::stringstream & s,
Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response,
Output & used_output)
{
try
{
@ -186,8 +186,32 @@ void HTTPHandler::trySendExceptionToClient(std::stringstream & s, Poco::Net::HTT
}
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
if (!response.sent() && !used_output.out_maybe_compressed)
{
/// Ещё ничего не отправляли, и даже не знаем, нужно ли сжимать ответ.
response.send() << s.str() << std::endl;
}
else if (used_output.out_maybe_compressed)
{
/** Отправим в использованный (возможно сжатый) поток сообщение об ошибке.
* Сообщение об ошибке может идти невпопад - после каких-то данных.
* Также стоит иметь ввиду, что мы могли уже отправить код 200.
*/
/** Если данные есть в буфере, но их ещё не отправили, то и не будем отправлять */
if (used_output.out->count() - used_output.out->offset() == 0)
{
used_output.out_maybe_compressed->position() = used_output.out_maybe_compressed->buffer().begin();
used_output.out->position() = used_output.out->buffer().begin();
}
std::string exception_message = s.str();
writeString(exception_message, *used_output.out_maybe_compressed);
writeChar('\n', *used_output.out_maybe_compressed);
used_output.out_maybe_compressed->next();
used_output.out->finalize();
}
}
catch (...)
{
@ -198,6 +222,8 @@ void HTTPHandler::trySendExceptionToClient(std::stringstream & s, Poco::Net::HTT
void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
Output used_output;
try
{
bool is_browser = false;
@ -215,7 +241,7 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1)
response.setChunkedTransferEncoding(true);
processQuery(request, response);
processQuery(request, response, used_output);
LOG_INFO(log, "Done processing query");
}
catch (Exception & e)
@ -224,26 +250,26 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
s << "Code: " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what();
LOG_ERROR(log, s.str());
trySendExceptionToClient(s, request, response);
trySendExceptionToClient(s, request, response, used_output);
}
catch (Poco::Exception & e)
{
std::stringstream s;
s << "Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what();
trySendExceptionToClient(s, request, response);
trySendExceptionToClient(s, request, response, used_output);
}
catch (std::exception & e)
{
std::stringstream s;
s << "Code: " << ErrorCodes::STD_EXCEPTION << ". " << e.what();
trySendExceptionToClient(s, request, response);
trySendExceptionToClient(s, request, response, used_output);
}
catch (...)
{
std::stringstream s;
s << "Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ". Unknown exception.";
trySendExceptionToClient(s, request, response);
trySendExceptionToClient(s, request, response, used_output);
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <DB/IO/WriteBufferFromHTTPServerResponse.h>
#include "Server.h"
@ -16,15 +17,26 @@ public:
{
}
struct Output
{
SharedPtr<WriteBufferFromHTTPServerResponse> out;
/// Используется для выдачи ответа. Равен либо out, либо CompressedWriteBuffer(*out), в зависимости от настроек.
SharedPtr<WriteBuffer> out_maybe_compressed;
};
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
void trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
void trySendExceptionToClient(std::stringstream & s,
Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response,
Output & used_output);
private:
Server & server;
Logger * log;
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
/// Функция также инициализирует used_output.
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, Output & used_output);
};
}

View File

@ -189,7 +189,7 @@ StoragePtr StorageFactory::get(
}
else if (endsWith(name, "MergeTree"))
{
/** Движки [Replicated][Summing|Collapsing]MergeTree (6 комбинаций)
/** Движки [Replicated][Summing|Collapsing|Aggregating|]MergeTree (8 комбинаций)
* В качестве аргумента для движка должно быть указано:
* - (для Replicated) Путь к таблице в ZooKeeper
* - (для Replicated) Имя реплики в ZooKeeper