dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-15 15:09:26 +03:00
parent c60ed2b1df
commit c43c758618
3 changed files with 185 additions and 33 deletions

View File

@ -36,6 +36,14 @@ namespace DB
void sendQuery(const String & query, const String & query_id = "", UInt64 stage = QueryProcessingStage::Complete,
const Settings * settings_ = nullptr, bool with_pending_data = false);
void disconnect();
void sendCancel();
void drainResidualPackets();
std::string dumpAddresses() const;
private:
using ConnectionHash = std::unordered_map<int, ConnectionInfo>;

View File

@ -85,10 +85,20 @@ public:
if (sent_query && !was_cancelled && !finished && !got_exception_from_server)
{
LOG_TRACE(log, "(" + connection->getServerAddress() + ") Cancelling query");
std::string addresses;
if (use_many_replicas)
addresses = replicas_connections->dumpAddresses();
else
addresses = connection->getServerAddress();
LOG_TRACE(log, "(" + addresses + ") Cancelling query");
/// Если запрошено прервать запрос - попросим удалённый сервер тоже прервать запрос.
if (use_many_replicas)
replicas_connections->sendCancel();
else
connection->sendCancel();
was_cancelled = true;
}
}
@ -100,8 +110,13 @@ public:
* чтобы оно не осталось висеть в рассихронизированном состоянии.
*/
if (sent_query && !finished)
{
if (use_many_replicas)
replicas_connections->disconnect();
else
connection->disconnect();
}
}
protected:
/// Отправить на удаленные сервера все временные таблицы
@ -167,11 +182,27 @@ protected:
case Protocol::Server::Exception:
got_exception_from_server = true;
if (use_many_replicas)
{
// Cancel and drain all the remaining connections.
replicas_connections->sendCancel();
replicas_connections->drainResidualPackets();
}
packet.exception->rethrow();
break;
case Protocol::Server::EndOfStream:
finished = true;
if (use_many_replicas)
{
// Cancel and drain all the remaining connections.
replicas_connections->sendCancel();
replicas_connections->drainResidualPackets();
}
return Block();
case Protocol::Server::Progress:
@ -224,12 +255,26 @@ protected:
/// Отправим просьбу прервать выполнение запроса, если ещё не отправляли.
if (!was_cancelled)
{
LOG_TRACE(log, "(" + connection->getServerAddress() + ") Cancelling query because enough data has been read");
std::string addresses;
if (use_many_replicas)
addresses = replicas_connections->dumpAddresses();
else
addresses = connection->getServerAddress();
LOG_TRACE(log, "(" + addresses + ") Cancelling query because enough data has been read");
was_cancelled = true;
if (use_many_replicas)
replicas_connections->sendCancel();
else
connection->sendCancel();
}
if (use_many_replicas)
replicas_connections->drainResidualPackets();
else
{
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером.
while (true)
{
@ -256,6 +301,7 @@ protected:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
}
finished = true;
}

View File

@ -83,13 +83,6 @@ namespace DB
{
Connection::Packet packet = info.connection->receivePacket();
if (info.packet_number == next_packet_number)
{
++info.packet_number;
++next_packet_number;
return packet;
}
switch (packet.type)
{
case Protocol::Server::Data:
@ -97,8 +90,6 @@ namespace DB
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
case Protocol::Server::Exception:
++info.packet_number;
break;
default:
@ -106,6 +97,15 @@ namespace DB
--valid_connections_count;
break;
}
if (info.packet_number == next_packet_number)
{
++info.packet_number;
++next_packet_number;
return packet;
}
else
++info.packet_number;
}
}
}
@ -119,4 +119,102 @@ namespace DB
connection->sendQuery(query, query_id, stage, settings_, with_pending_data);
}
}
void ReplicasConnections::disconnect()
{
for (auto & e : connection_hash)
{
ConnectionInfo & info = e.second;
if (info.is_valid)
{
Connection * connection = info.connection;
connection->disconnect();
}
}
}
void ReplicasConnections::sendCancel()
{
for (auto & e : connection_hash)
{
ConnectionInfo & info = e.second;
if (info.is_valid)
{
Connection * connection = info.connection;
connection->sendCancel();
}
}
}
void ReplicasConnections::drainResidualPackets()
{
bool caught_exceptions = false;
for (auto & e : connection_hash)
{
ConnectionInfo & info = e.second;
if (info.is_valid)
{
Connection * connection = info.connection;
bool again = true;
while (again)
{
Connection::Packet packet = connection->receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
again = false;
continue;
case Protocol::Server::Exception:
// Accumulate info from packet.exception
caught_exceptions = true;
again = false;
continue;
default:
// Accumulate info (server address)
caught_exceptions = true;
again = false;
continue;
}
}
}
}
if (caught_exceptions)
{
}
}
std::string ReplicasConnections::dumpAddresses() const
{
if (valid_connections_count == 0)
return "";
std::ostringstream os;
for (auto & e : connection_hash)
{
char prefix = '\0';
const ConnectionInfo & info = e.second;
if (info.is_valid)
{
const Connection * connection = info.connection;
os << prefix << connection->getServerAddress();
if (prefix == '\0')
prefix = ';';
}
}
return os.str();
}
}