dbms: fixed error [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-12-06 17:32:48 +00:00
parent 2b9f4ca2bf
commit 8a053aba54
3 changed files with 41 additions and 20 deletions

View File

@ -144,11 +144,13 @@ public:
finish = true;
cancel();
ExceptionPtr exception;
/// Вынем всё, что есть в очереди готовых данных.
OutputData res;
while (output_queue.tryPop(res))
if (res.exception && !std::uncaught_exception())
res.exception->rethrow();
if (res.exception && !exception)
exception = res.exception;
/** В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится.
* PS. Может быть, для переменной finish нужен барьер?
@ -159,8 +161,11 @@ public:
/// Может быть, нам под конец положили эксепшен.
while (output_queue.tryPop(res))
if (res.exception && !std::uncaught_exception())
res.exception->rethrow();
if (res.exception && !exception)
exception = res.exception;
if (exception && !std::uncaught_exception())
exception->rethrow();
LOG_TRACE(log, "Waited for threads to finish");
}

View File

@ -68,6 +68,11 @@ void TCPHandler::runImpl()
Stopwatch watch;
state.reset();
/** Исключение во время выполнения запроса (его надо отдать по сети клиенту).
* Клиент сможет его принять, если оно не произошло во время отправки другого пакета.
*/
ExceptionPtr exception;
try
{
@ -90,12 +95,14 @@ void TCPHandler::runImpl()
processOrdinaryQuery();
sendEndOfStream();
state.reset();
}
catch (DB::Exception & e)
{
LOG_ERROR(log, "DB::Exception. Code: " << e.code() << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what()
<< ", Stack trace:\n\n" << e.getStackTrace().toString());
state.exception = e.clone();
exception = e.clone();
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
throw;
@ -104,23 +111,37 @@ void TCPHandler::runImpl()
{
LOG_ERROR(log, "Poco::Exception. Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what());
state.exception = new Exception(e.displayText(), e.code());
exception = new Exception(e.displayText(), e.code());
}
catch (std::exception & e)
{
LOG_ERROR(log, "std::exception. Code: " << ErrorCodes::STD_EXCEPTION << ", e.what() = " << e.what());
state.exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION);
exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION);
}
catch (...)
{
LOG_ERROR(log, "Unknown exception. Code: " << ErrorCodes::UNKNOWN_EXCEPTION);
state.exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
if (state.exception)
sendException(*state.exception);
if (exception)
{
sendException(*exception);
try
{
state.reset();
}
catch (...)
{
/** В процессе обработки запроса было исключение, которое мы поймали и отправили клиенту.
* При уничтожении конвейера выполнения запроса, было второе исключение.
* Например, конвейер мог выполняться в нескольких потоках, и в каждом из них могло возникнуть исключение.
* Проигнорируем его.
*/
}
}
state.reset();
watch.stop();
LOG_INFO(log, std::fixed << std::setprecision(3)

View File

@ -43,11 +43,6 @@ struct QueryState
Context context;
/** Исключение во время выполнения запроса (его надо отдать по сети клиенту).
* Клиент сможет его принять, если оно не произошло во время отправки другого пакета.
*/
SharedPtr<Exception> exception;
bool is_cancelled;
/// Данные были отправлены.
bool sent_all_data;
@ -93,9 +88,6 @@ private:
SharedPtr<ReadBufferFromPocoSocket> in;
SharedPtr<WriteBufferFromPocoSocket> out;
/// На данный момент, поддерживается одновременное выполнение только одного запроса в соединении.
QueryState state;
/// Для сериализации пакетов "данные" и "прогресс" (пакет типа "прогресс" может отправляться из другого потока).
Poco::FastMutex send_mutex;
@ -104,7 +96,10 @@ private:
Stopwatch after_send_progress;
String default_database;
/// На данный момент, поддерживается одновременное выполнение только одного запроса в соединении.
QueryState state;
void runImpl();