From 3f62b5bb2e81a39d770503238d185df753678159 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 20 Oct 2012 06:40:55 +0000 Subject: [PATCH] dbms: development [#CONV-2944]. --- dbms/include/DB/DataStreams/UnionBlockInputStream.h | 2 ++ dbms/src/Client/Connection.cpp | 6 +++--- dbms/src/Server/TCPHandler.cpp | 11 +++++++++++ 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/dbms/include/DB/DataStreams/UnionBlockInputStream.h b/dbms/include/DB/DataStreams/UnionBlockInputStream.h index 7b6eac1f702..473924ea82f 100644 --- a/dbms/include/DB/DataStreams/UnionBlockInputStream.h +++ b/dbms/include/DB/DataStreams/UnionBlockInputStream.h @@ -102,6 +102,8 @@ public: finish = true; cancel(); + + // TODO: может быть, здесь может возникнуть блокировка, если очередь переполнена, и какой-нибудь из потоков попытается положить в неё блок. for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it) it->thread->join(); diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 44e591da7ba..b8686d0067c 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -22,6 +22,9 @@ void Connection::connect() { try { + if (connected) + disconnect(); + LOG_TRACE(log, "Connecting"); socket.connect(Poco::Net::SocketAddress(host, port), connect_timeout); @@ -161,9 +164,6 @@ bool Connection::ping() } catch (const Poco::Exception & e) { - /// Закроем соединение, чтобы не было рассинхронизации. - disconnect(); - LOG_TRACE(log, e.displayText()); return false; } diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index 443a7463443..36d2d632f77 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -166,7 +166,10 @@ void TCPHandler::processOrdinaryQuery() break; } else if (isQueryCancelled()) + { async_in.cancel(); + break; + } } sendData(block); @@ -263,6 +266,14 @@ bool TCPHandler::receivePacket() out->next(); break; + case Protocol::Client::Cancel: + /// Если пришёл запоздавший пакет Cancel, то игнорируем его. + break; + + case Protocol::Client::Hello: + throw Exception("Unexpected packet " + String(Protocol::Client::toString(Protocol::Client::Enum(packet_type))) + " received from client", + ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT); + default: throw Exception("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT); }