mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
dbms: Server: queries with several replicas: development [#METR-14410]
This commit is contained in:
parent
820e6f4498
commit
2c6d3efc61
@ -36,6 +36,14 @@ namespace DB
|
|||||||
void sendQuery(const String & query, const String & query_id = "", UInt64 stage = QueryProcessingStage::Complete,
|
void sendQuery(const String & query, const String & query_id = "", UInt64 stage = QueryProcessingStage::Complete,
|
||||||
const Settings * settings_ = nullptr, bool with_pending_data = false);
|
const Settings * settings_ = nullptr, bool with_pending_data = false);
|
||||||
|
|
||||||
|
void disconnect();
|
||||||
|
|
||||||
|
void sendCancel();
|
||||||
|
|
||||||
|
void drainResidualPackets();
|
||||||
|
|
||||||
|
std::string dumpAddresses() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using ConnectionHash = std::unordered_map<int, ConnectionInfo>;
|
using ConnectionHash = std::unordered_map<int, ConnectionInfo>;
|
||||||
|
|
||||||
|
@ -85,10 +85,20 @@ public:
|
|||||||
|
|
||||||
if (sent_query && !was_cancelled && !finished && !got_exception_from_server)
|
if (sent_query && !was_cancelled && !finished && !got_exception_from_server)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "(" + connection->getServerAddress() + ") Cancelling query");
|
std::string addresses;
|
||||||
|
if (use_many_replicas)
|
||||||
|
addresses = replicas_connections->dumpAddresses();
|
||||||
|
else
|
||||||
|
addresses = connection->getServerAddress();
|
||||||
|
|
||||||
|
LOG_TRACE(log, "(" + addresses + ") Cancelling query");
|
||||||
|
|
||||||
/// Если запрошено прервать запрос - попросим удалённый сервер тоже прервать запрос.
|
/// Если запрошено прервать запрос - попросим удалённый сервер тоже прервать запрос.
|
||||||
connection->sendCancel();
|
if (use_many_replicas)
|
||||||
|
replicas_connections->sendCancel();
|
||||||
|
else
|
||||||
|
connection->sendCancel();
|
||||||
|
|
||||||
was_cancelled = true;
|
was_cancelled = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -100,7 +110,12 @@ public:
|
|||||||
* чтобы оно не осталось висеть в рассихронизированном состоянии.
|
* чтобы оно не осталось висеть в рассихронизированном состоянии.
|
||||||
*/
|
*/
|
||||||
if (sent_query && !finished)
|
if (sent_query && !finished)
|
||||||
connection->disconnect();
|
{
|
||||||
|
if (use_many_replicas)
|
||||||
|
replicas_connections->disconnect();
|
||||||
|
else
|
||||||
|
connection->disconnect();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
@ -167,11 +182,27 @@ protected:
|
|||||||
|
|
||||||
case Protocol::Server::Exception:
|
case Protocol::Server::Exception:
|
||||||
got_exception_from_server = true;
|
got_exception_from_server = true;
|
||||||
|
|
||||||
|
if (use_many_replicas)
|
||||||
|
{
|
||||||
|
// Cancel and drain all the remaining connections.
|
||||||
|
replicas_connections->sendCancel();
|
||||||
|
replicas_connections->drainResidualPackets();
|
||||||
|
}
|
||||||
|
|
||||||
packet.exception->rethrow();
|
packet.exception->rethrow();
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case Protocol::Server::EndOfStream:
|
case Protocol::Server::EndOfStream:
|
||||||
finished = true;
|
finished = true;
|
||||||
|
|
||||||
|
if (use_many_replicas)
|
||||||
|
{
|
||||||
|
// Cancel and drain all the remaining connections.
|
||||||
|
replicas_connections->sendCancel();
|
||||||
|
replicas_connections->drainResidualPackets();
|
||||||
|
}
|
||||||
|
|
||||||
return Block();
|
return Block();
|
||||||
|
|
||||||
case Protocol::Server::Progress:
|
case Protocol::Server::Progress:
|
||||||
@ -224,36 +255,51 @@ protected:
|
|||||||
/// Отправим просьбу прервать выполнение запроса, если ещё не отправляли.
|
/// Отправим просьбу прервать выполнение запроса, если ещё не отправляли.
|
||||||
if (!was_cancelled)
|
if (!was_cancelled)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "(" + connection->getServerAddress() + ") Cancelling query because enough data has been read");
|
std::string addresses;
|
||||||
|
if (use_many_replicas)
|
||||||
|
addresses = replicas_connections->dumpAddresses();
|
||||||
|
else
|
||||||
|
addresses = connection->getServerAddress();
|
||||||
|
|
||||||
|
LOG_TRACE(log, "(" + addresses + ") Cancelling query because enough data has been read");
|
||||||
|
|
||||||
was_cancelled = true;
|
was_cancelled = true;
|
||||||
connection->sendCancel();
|
|
||||||
|
if (use_many_replicas)
|
||||||
|
replicas_connections->sendCancel();
|
||||||
|
else
|
||||||
|
connection->sendCancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером.
|
if (use_many_replicas)
|
||||||
while (true)
|
replicas_connections->drainResidualPackets();
|
||||||
|
else
|
||||||
{
|
{
|
||||||
Connection::Packet packet = connection->receivePacket();
|
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером.
|
||||||
|
while (true)
|
||||||
switch (packet.type)
|
|
||||||
{
|
{
|
||||||
case Protocol::Server::Data:
|
Connection::Packet packet = connection->receivePacket();
|
||||||
case Protocol::Server::Progress:
|
|
||||||
case Protocol::Server::ProfileInfo:
|
|
||||||
case Protocol::Server::Totals:
|
|
||||||
case Protocol::Server::Extremes:
|
|
||||||
break;
|
|
||||||
|
|
||||||
case Protocol::Server::EndOfStream:
|
switch (packet.type)
|
||||||
return;
|
{
|
||||||
|
case Protocol::Server::Data:
|
||||||
|
case Protocol::Server::Progress:
|
||||||
|
case Protocol::Server::ProfileInfo:
|
||||||
|
case Protocol::Server::Totals:
|
||||||
|
case Protocol::Server::Extremes:
|
||||||
|
break;
|
||||||
|
|
||||||
case Protocol::Server::Exception:
|
case Protocol::Server::EndOfStream:
|
||||||
got_exception_from_server = true;
|
return;
|
||||||
packet.exception->rethrow();
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
case Protocol::Server::Exception:
|
||||||
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
|
got_exception_from_server = true;
|
||||||
|
packet.exception->rethrow();
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,13 +83,6 @@ namespace DB
|
|||||||
{
|
{
|
||||||
Connection::Packet packet = info.connection->receivePacket();
|
Connection::Packet packet = info.connection->receivePacket();
|
||||||
|
|
||||||
if (info.packet_number == next_packet_number)
|
|
||||||
{
|
|
||||||
++info.packet_number;
|
|
||||||
++next_packet_number;
|
|
||||||
return packet;
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (packet.type)
|
switch (packet.type)
|
||||||
{
|
{
|
||||||
case Protocol::Server::Data:
|
case Protocol::Server::Data:
|
||||||
@ -97,8 +90,6 @@ namespace DB
|
|||||||
case Protocol::Server::ProfileInfo:
|
case Protocol::Server::ProfileInfo:
|
||||||
case Protocol::Server::Totals:
|
case Protocol::Server::Totals:
|
||||||
case Protocol::Server::Extremes:
|
case Protocol::Server::Extremes:
|
||||||
case Protocol::Server::Exception:
|
|
||||||
++info.packet_number;
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
@ -106,6 +97,15 @@ namespace DB
|
|||||||
--valid_connections_count;
|
--valid_connections_count;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (info.packet_number == next_packet_number)
|
||||||
|
{
|
||||||
|
++info.packet_number;
|
||||||
|
++next_packet_number;
|
||||||
|
return packet;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
++info.packet_number;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -119,4 +119,102 @@ namespace DB
|
|||||||
connection->sendQuery(query, query_id, stage, settings_, with_pending_data);
|
connection->sendQuery(query, query_id, stage, settings_, with_pending_data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ReplicasConnections::disconnect()
|
||||||
|
{
|
||||||
|
for (auto & e : connection_hash)
|
||||||
|
{
|
||||||
|
ConnectionInfo & info = e.second;
|
||||||
|
if (info.is_valid)
|
||||||
|
{
|
||||||
|
Connection * connection = info.connection;
|
||||||
|
connection->disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReplicasConnections::sendCancel()
|
||||||
|
{
|
||||||
|
for (auto & e : connection_hash)
|
||||||
|
{
|
||||||
|
ConnectionInfo & info = e.second;
|
||||||
|
if (info.is_valid)
|
||||||
|
{
|
||||||
|
Connection * connection = info.connection;
|
||||||
|
connection->sendCancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReplicasConnections::drainResidualPackets()
|
||||||
|
{
|
||||||
|
bool caught_exceptions = false;
|
||||||
|
|
||||||
|
for (auto & e : connection_hash)
|
||||||
|
{
|
||||||
|
ConnectionInfo & info = e.second;
|
||||||
|
if (info.is_valid)
|
||||||
|
{
|
||||||
|
Connection * connection = info.connection;
|
||||||
|
bool again = true;
|
||||||
|
|
||||||
|
while (again)
|
||||||
|
{
|
||||||
|
Connection::Packet packet = connection->receivePacket();
|
||||||
|
|
||||||
|
switch (packet.type)
|
||||||
|
{
|
||||||
|
case Protocol::Server::Data:
|
||||||
|
case Protocol::Server::Progress:
|
||||||
|
case Protocol::Server::ProfileInfo:
|
||||||
|
case Protocol::Server::Totals:
|
||||||
|
case Protocol::Server::Extremes:
|
||||||
|
break;
|
||||||
|
|
||||||
|
case Protocol::Server::EndOfStream:
|
||||||
|
again = false;
|
||||||
|
continue;
|
||||||
|
|
||||||
|
case Protocol::Server::Exception:
|
||||||
|
// Accumulate info from packet.exception
|
||||||
|
caught_exceptions = true;
|
||||||
|
again = false;
|
||||||
|
continue;
|
||||||
|
|
||||||
|
default:
|
||||||
|
// Accumulate info (server address)
|
||||||
|
caught_exceptions = true;
|
||||||
|
again = false;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (caught_exceptions)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string ReplicasConnections::dumpAddresses() const
|
||||||
|
{
|
||||||
|
if (valid_connections_count == 0)
|
||||||
|
return "";
|
||||||
|
|
||||||
|
std::ostringstream os;
|
||||||
|
for (auto & e : connection_hash)
|
||||||
|
{
|
||||||
|
char prefix = '\0';
|
||||||
|
const ConnectionInfo & info = e.second;
|
||||||
|
if (info.is_valid)
|
||||||
|
{
|
||||||
|
const Connection * connection = info.connection;
|
||||||
|
os << prefix << connection->getServerAddress();
|
||||||
|
if (prefix == '\0')
|
||||||
|
prefix = ';';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return os.str();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user