dbms: fixed error [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-12-06 17:32:48 +00:00
parent 2b9f4ca2bf
commit 8a053aba54
3 changed files with 41 additions and 20 deletions

View File

@ -144,11 +144,13 @@ public:
finish = true; finish = true;
cancel(); cancel();
ExceptionPtr exception;
/// Вынем всё, что есть в очереди готовых данных. /// Вынем всё, что есть в очереди готовых данных.
OutputData res; OutputData res;
while (output_queue.tryPop(res)) while (output_queue.tryPop(res))
if (res.exception && !std::uncaught_exception()) if (res.exception && !exception)
res.exception->rethrow(); exception = res.exception;
/** В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится. /** В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится.
* PS. Может быть, для переменной finish нужен барьер? * PS. Может быть, для переменной finish нужен барьер?
@ -159,8 +161,11 @@ public:
/// Может быть, нам под конец положили эксепшен. /// Может быть, нам под конец положили эксепшен.
while (output_queue.tryPop(res)) while (output_queue.tryPop(res))
if (res.exception && !std::uncaught_exception()) if (res.exception && !exception)
res.exception->rethrow(); exception = res.exception;
if (exception && !std::uncaught_exception())
exception->rethrow();
LOG_TRACE(log, "Waited for threads to finish"); LOG_TRACE(log, "Waited for threads to finish");
} }

View File

@ -69,6 +69,11 @@ void TCPHandler::runImpl()
Stopwatch watch; Stopwatch watch;
state.reset(); state.reset();
/** Исключение во время выполнения запроса (его надо отдать по сети клиенту).
* Клиент сможет его принять, если оно не произошло во время отправки другого пакета.
*/
ExceptionPtr exception;
try try
{ {
/// Пакет Query. (Также, если пришёл пакет Ping - обрабатываем его и продолжаем ждать Query.) /// Пакет Query. (Также, если пришёл пакет Ping - обрабатываем его и продолжаем ждать Query.)
@ -90,12 +95,14 @@ void TCPHandler::runImpl()
processOrdinaryQuery(); processOrdinaryQuery();
sendEndOfStream(); sendEndOfStream();
state.reset();
} }
catch (DB::Exception & e) catch (DB::Exception & e)
{ {
LOG_ERROR(log, "DB::Exception. Code: " << e.code() << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what() LOG_ERROR(log, "DB::Exception. Code: " << e.code() << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what()
<< ", Stack trace:\n\n" << e.getStackTrace().toString()); << ", Stack trace:\n\n" << e.getStackTrace().toString());
state.exception = e.clone(); exception = e.clone();
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT) if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
throw; throw;
@ -104,23 +111,37 @@ void TCPHandler::runImpl()
{ {
LOG_ERROR(log, "Poco::Exception. Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code() LOG_ERROR(log, "Poco::Exception. Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what()); << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what());
state.exception = new Exception(e.displayText(), e.code()); exception = new Exception(e.displayText(), e.code());
} }
catch (std::exception & e) catch (std::exception & e)
{ {
LOG_ERROR(log, "std::exception. Code: " << ErrorCodes::STD_EXCEPTION << ", e.what() = " << e.what()); LOG_ERROR(log, "std::exception. Code: " << ErrorCodes::STD_EXCEPTION << ", e.what() = " << e.what());
state.exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION); exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION);
} }
catch (...) catch (...)
{ {
LOG_ERROR(log, "Unknown exception. Code: " << ErrorCodes::UNKNOWN_EXCEPTION); LOG_ERROR(log, "Unknown exception. Code: " << ErrorCodes::UNKNOWN_EXCEPTION);
state.exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION); exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
} }
if (state.exception) if (exception)
sendException(*state.exception); {
sendException(*exception);
try
{
state.reset(); state.reset();
}
catch (...)
{
/** В процессе обработки запроса было исключение, которое мы поймали и отправили клиенту.
* При уничтожении конвейера выполнения запроса, было второе исключение.
* Например, конвейер мог выполняться в нескольких потоках, и в каждом из них могло возникнуть исключение.
* Проигнорируем его.
*/
}
}
watch.stop(); watch.stop();
LOG_INFO(log, std::fixed << std::setprecision(3) LOG_INFO(log, std::fixed << std::setprecision(3)

View File

@ -43,11 +43,6 @@ struct QueryState
Context context; Context context;
/** Исключение во время выполнения запроса (его надо отдать по сети клиенту).
* Клиент сможет его принять, если оно не произошло во время отправки другого пакета.
*/
SharedPtr<Exception> exception;
bool is_cancelled; bool is_cancelled;
/// Данные были отправлены. /// Данные были отправлены.
bool sent_all_data; bool sent_all_data;
@ -93,9 +88,6 @@ private:
SharedPtr<ReadBufferFromPocoSocket> in; SharedPtr<ReadBufferFromPocoSocket> in;
SharedPtr<WriteBufferFromPocoSocket> out; SharedPtr<WriteBufferFromPocoSocket> out;
/// На данный момент, поддерживается одновременное выполнение только одного запроса в соединении.
QueryState state;
/// Для сериализации пакетов "данные" и "прогресс" (пакет типа "прогресс" может отправляться из другого потока). /// Для сериализации пакетов "данные" и "прогресс" (пакет типа "прогресс" может отправляться из другого потока).
Poco::FastMutex send_mutex; Poco::FastMutex send_mutex;
@ -105,6 +97,9 @@ private:
String default_database; String default_database;
/// На данный момент, поддерживается одновременное выполнение только одного запроса в соединении.
QueryState state;
void runImpl(); void runImpl();