mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
dbms: fixed locking of connection when cancel is sent before start of query processing [#CONV-2944].
This commit is contained in:
parent
40f602ff92
commit
f9e426cf20
@ -96,10 +96,11 @@ void TCPHandler::runImpl()
|
||||
/// Восстанавливаем контекст запроса.
|
||||
query_context = connection_context;
|
||||
|
||||
/** Пакет Query. (Также, если пришёл пакет Ping - обрабатываем его и продолжаем ждать Query.)
|
||||
/** Если Query - обрабатываем. Если Ping или Cancel - возвращаемся в начало.
|
||||
* Могут прийти настройки на отдельный запрос, которые модифицируют query_context.
|
||||
*/
|
||||
receivePacket();
|
||||
if (!receivePacket())
|
||||
continue;
|
||||
|
||||
/// Обрабатываем Query
|
||||
|
||||
@ -396,42 +397,38 @@ void TCPHandler::sendHello()
|
||||
|
||||
bool TCPHandler::receivePacket()
|
||||
{
|
||||
while (true) /// Если пришёл пакет типа Ping, то обработаем его и получаем следующий пакет.
|
||||
UInt64 packet_type = 0;
|
||||
readVarUInt(packet_type, *in);
|
||||
|
||||
// std::cerr << "Packet: " << packet_type << std::endl;
|
||||
|
||||
switch (packet_type)
|
||||
{
|
||||
UInt64 packet_type = 0;
|
||||
readVarUInt(packet_type, *in);
|
||||
case Protocol::Client::Query:
|
||||
if (!state.empty())
|
||||
throw Exception("Unexpected packet Query received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
||||
receiveQuery();
|
||||
return true;
|
||||
|
||||
// std::cerr << "Packet: " << packet_type << std::endl;
|
||||
case Protocol::Client::Data:
|
||||
if (state.empty())
|
||||
throw Exception("Unexpected packet Data received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
||||
return receiveData();
|
||||
|
||||
switch (packet_type)
|
||||
{
|
||||
case Protocol::Client::Query:
|
||||
if (!state.empty())
|
||||
throw Exception("Unexpected packet Query received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
||||
receiveQuery();
|
||||
return true;
|
||||
case Protocol::Client::Ping:
|
||||
writeVarUInt(Protocol::Server::Pong, *out);
|
||||
out->next();
|
||||
return false;
|
||||
|
||||
case Protocol::Client::Data:
|
||||
if (state.empty())
|
||||
throw Exception("Unexpected packet Data received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
||||
return receiveData();
|
||||
case Protocol::Client::Cancel:
|
||||
return false;
|
||||
|
||||
case Protocol::Client::Ping:
|
||||
writeVarUInt(Protocol::Server::Pong, *out);
|
||||
out->next();
|
||||
break;
|
||||
case Protocol::Client::Hello:
|
||||
throw Exception("Unexpected packet " + String(Protocol::Client::toString(packet_type)) + " received from client",
|
||||
ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
||||
|
||||
case Protocol::Client::Cancel:
|
||||
/// Если пришёл запоздавший пакет Cancel, то игнорируем его.
|
||||
break;
|
||||
|
||||
case Protocol::Client::Hello:
|
||||
throw Exception("Unexpected packet " + String(Protocol::Client::toString(packet_type)) + " received from client",
|
||||
ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
||||
|
||||
default:
|
||||
throw Exception("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
|
||||
}
|
||||
default:
|
||||
throw Exception("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
|
||||
}
|
||||
}
|
||||
|
||||
@ -440,7 +437,7 @@ void TCPHandler::receiveQuery()
|
||||
{
|
||||
UInt64 stage = 0;
|
||||
UInt64 compression = 0;
|
||||
|
||||
|
||||
readIntBinary(state.query_id, *in);
|
||||
|
||||
/// Настройки на отдельный запрос.
|
||||
@ -495,7 +492,7 @@ void TCPHandler::initBlockInput()
|
||||
*state.maybe_compressed_in,
|
||||
state.io.out_sample,
|
||||
query_context.getSettingsRef().max_block_size,
|
||||
query_context.getDataTypeFactory());
|
||||
query_context.getDataTypeFactory());
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user