[WIP] Extended Ping Pong packets. [#CLICKHOUSE-3729]

This commit is contained in:
Vitaliy Lyudvichenko 2018-06-21 19:40:15 +03:00
parent 1a4157f2a9
commit 61f9007ca1
9 changed files with 244 additions and 67 deletions

View File

@ -962,10 +962,17 @@ private:
connection->sendData(block);
processed_rows += block.rows();
/// Check if server send Log packet
auto packet_type = connection->checkPacket();
if (packet_type && *packet_type == Protocol::Server::Log)
receiveAndProcessPacket();
/// Check if server send a packet that should be processed immediately
if (auto packet_type = connection->checkPacketType())
{
if (*packet_type == Protocol::Server::Log || *packet_type == Protocol::Server::Pong)
receiveAndProcessPacket();
else if (*packet_type == Protocol::Server::Exception)
{
/// Break insert in an exception ocurred
break;
}
}
if (!block)
break;
@ -1057,6 +1064,9 @@ private:
onProgress(packet.progress);
return true;
case Protocol::Server::Pong:
return false;
case Protocol::Server::ProfileInfo:
onProfileInfo(packet.profile_info);
return true;
@ -1083,7 +1093,7 @@ private:
return false;
default:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
throw Exception("Unknown packet " + std::to_string(packet.type) + " from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
@ -1110,6 +1120,9 @@ private:
onLogData(packet.block);
break;
case Protocol::Server::Pong:
break;
default:
throw NetException("Unexpected packet from server (expected Data, Exception or Log, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
@ -1118,7 +1131,7 @@ private:
}
/// Process Log packets, exit when recieve Exception or EndOfStream
/// Process Log packets, exit when receieve Exception or EndOfStream
bool receiveEndOfQuery()
{
while (true)
@ -1140,6 +1153,9 @@ private:
onLogData(packet.block);
break;
case Protocol::Server::Pong:
break;
default:
throw NetException("Unexpected packet from server (expected Exception, EndOfStream or Log, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);

View File

@ -52,6 +52,24 @@ namespace ErrorCodes
}
bool TCPHandler::poll(size_t timeout_microseconds)
{
return static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_microseconds);
}
bool TCPHandler::hasReadPendingData() const
{
return in_last_packet_type.has_value() || static_cast<const ReadBufferFromPocoSocket &>(*in).hasPendingData();
}
bool TCPHandler::eof() const
{
return !in_last_packet_type.has_value() && in->eof();
}
void TCPHandler::runImpl()
{
connection_context = server.context();
@ -63,10 +81,11 @@ void TCPHandler::runImpl()
socket().setSendTimeout(global_settings.send_timeout);
socket().setNoDelay(true);
in_last_packet_type.reset();
in = std::make_shared<ReadBufferFromPocoSocket>(socket());
out = std::make_shared<WriteBufferFromPocoSocket>(socket());
if (in->eof())
if (eof())
{
LOG_WARNING(log, "Client has not sent any data.");
return;
@ -119,14 +138,14 @@ void TCPHandler::runImpl()
connection_context.setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
while (1)
while (true)
{
/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
while (!static_cast<ReadBufferFromPocoSocket &>(*in).poll(global_settings.poll_interval * 1000000) && !server.isCancelled())
while (!hasReadPendingData() && !poll(global_settings.poll_interval * 1000000) && !server.isCancelled())
;
/// If we need to shut down, or client disconnects.
if (server.isCancelled() || in->eof())
if (server.isCancelled() || eof())
break;
Stopwatch watch;
@ -146,11 +165,27 @@ void TCPHandler::runImpl()
/// If a user passed query-local timeouts, reset socket to initial state at the end of the query
SCOPE_EXIT({state.timeout_setter.reset();});
/** If Query - process it. If Ping or Cancel - go back to the beginning.
/** If Query - process it. If Ping, Cancel or TablesStatusRequest - go back to the beginning.
* There may come settings for a separate query that modify `query_context`.
*/
if (!receivePacket())
continue;
UInt64 packet_type = receivePacketType();
switch (packet_type)
{
case Protocol::Client::Query:
receiveAndProcessPacket();
break;
case Protocol::Client::Ping:
case Protocol::Client::Cancel:
case Protocol::Client::TablesStatusRequest:
receiveAndProcessPacket();
/// exit from the loop iteration
continue;
default:
throw Exception("Unexpected packet " + String(Protocol::Client::toString(packet_type)) + " received from client",
ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
}
CurrentThread::initializeQuery();
@ -192,7 +227,7 @@ void TCPHandler::runImpl()
processOrdinaryQuery();
/// Reset BlockIO in advance to log destruction actions
state.io.reset();
//state.io.reset();
sendLogs();
sendEndOfStream();
@ -332,12 +367,31 @@ void TCPHandler::readData(const Settings & global_settings)
}
/// If client disconnected.
if (in->eof())
if (eof())
return;
/// We accept and process data. And if they are over, then we leave.
if (!receivePacket())
UInt64 packet_type = receivePacketType();
if (packet_type == Protocol::Client::Data)
{
/// Accept and process data. And if they are over, then we leave.
if (!receiveAndProcessPacket())
break;
}
else if (packet_type == Protocol::Client::Ping && client_revision >= DBMS_MIN_REVISION_WITH_EXTENDED_PING)
{
/// We allow ping packets in the middle of the query only for new enough clients
receiveAndProcessPacket();
}
else if (packet_type == Protocol::Client::Cancel)
{
receiveAndProcessPacket();
break;
}
else
{
throw Exception("Unexpected packet " + String(Protocol::Client::toString(packet_type)) + " received from client",
ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
}
sendLogs();
}
@ -521,11 +575,12 @@ void TCPHandler::sendExtremes()
void TCPHandler::receiveHello()
{
/// Receive `hello` packet.
UInt64 packet_type = 0;
UInt64 packet_type = receivePacketType();
in_last_packet_type.reset();
String user = "default";
String password;
readVarUInt(packet_type, *in);
if (packet_type != Protocol::Client::Hello)
{
/** If you accidentally accessed the HTTP protocol for a port destined for an internal TCP protocol,
@ -583,10 +638,22 @@ void TCPHandler::sendHello()
}
bool TCPHandler::receivePacket()
UInt64 TCPHandler::receivePacketType()
{
UInt64 packet_type = 0;
readVarUInt(packet_type, *in);
if (!in_last_packet_type.has_value())
{
UInt64 packet_type;
readVarUInt(packet_type, *in);
in_last_packet_type.emplace(packet_type);
}
return in_last_packet_type.value();
}
bool TCPHandler::receiveAndProcessPacket()
{
UInt64 packet_type = receivePacketType();
in_last_packet_type.reset();
// std::cerr << "Packet: " << packet_type << std::endl;
@ -604,8 +671,7 @@ bool TCPHandler::receivePacket()
return receiveData();
case Protocol::Client::Ping:
writeVarUInt(Protocol::Server::Pong, *out);
out->next();
processPing();
return false;
case Protocol::Client::Cancel:
@ -725,6 +791,22 @@ bool TCPHandler::receiveData()
}
void TCPHandler::processPing()
{
/// New versions send ID of ping packet to distinguish them
bool receive_id = client_revision >= DBMS_MIN_REVISION_WITH_EXTENDED_PING;
UInt64 id;
if (receive_id)
readVarUInt(id, *in);
writeVarUInt(Protocol::Server::Pong, *out);
if (receive_id)
writeVarUInt(id, *out);
out->next();
}
void TCPHandler::initBlockInput()
{
if (!state.block_in)

View File

@ -13,6 +13,7 @@
#include <DataStreams/BlockIO.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Client/TimeoutSetter.h>
#include <optional>
#include "IServer.h"
@ -112,6 +113,19 @@ private:
std::shared_ptr<ReadBuffer> in;
std::shared_ptr<WriteBuffer> out;
/// Contains type of a packet read from in buffer, but the packet has not been processed yet.
/// In fact, if it not empty, it is additional bytes in the head of in buffer.
std::optional<UInt64> in_last_packet_type;
/// Check, if has data to read.
bool poll(size_t timeout_microseconds = 0);
/// Check, if has data in read buffer or there is a pending packet type.
bool hasReadPendingData() const;
/// in->eof() and there is no read packet type.
bool eof() const;
/// Time after the last check to stop the request and send the progress.
Stopwatch after_check_cancelled;
Stopwatch after_send_progress;
@ -129,9 +143,11 @@ private:
void runImpl();
void receiveHello();
bool receivePacket();
UInt64 receivePacketType();
bool receiveAndProcessPacket();
void receiveQuery();
bool receiveData();
void processPing();
void readData(const Settings & global_settings);
/// Process INSERT query

View File

@ -113,12 +113,13 @@ void Connection::disconnect()
//LOG_TRACE(log_wrapper.get(), "Disconnecting");
in = nullptr;
last_input_packet_type.reset();
in_last_packet_type.reset();
out = nullptr; // can write to socket
if (socket)
socket->close();
socket = nullptr;
connected = false;
ping_id_increment = 0;
}
@ -264,28 +265,44 @@ bool Connection::ping()
TimeoutSetter timeout_setter(*socket, sync_request_timeout, true);
try
{
UInt64 pong = 0;
bool send_ping_id = server_revision >= DBMS_MIN_REVISION_WITH_EXTENDED_PING;
UInt64 ping_id = 0;
writeVarUInt(Protocol::Client::Ping, *out);
if (send_ping_id)
{
ping_id = ++ping_id_increment;
writeVarUInt(ping_id, *out);
}
out->next();
if (in->eof())
if (eof())
return false;
readVarUInt(pong, *in);
/// Could receive late packets with progress. TODO: Maybe possible to fix.
while (pong == Protocol::Server::Progress)
while (true)
{
receiveProgress();
UInt64 packet_type = receivePacketType();
if (in->eof())
return false;
/// Could receive late packets with progress. TODO: Maybe possible to fix.
if (packet_type == Protocol::Server::Progress)
{
receivePacket();
if (eof())
return false;
}
else if (packet_type == Protocol::Server::Pong)
{
if (receivePong() != ping_id)
throw NetException("Unexpected ping-pong response from server", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
readVarUInt(pong, *in);
/// Exit at first Pong
break;
}
else
{
throwUnexpectedPacket(packet_type, "Pong");
}
}
if (pong != Protocol::Server::Pong)
throwUnexpectedPacket(pong, "Pong");
}
catch (const Poco::Exception & e)
{
@ -507,48 +524,57 @@ bool Connection::poll(size_t timeout_microseconds)
bool Connection::hasReadPendingData() const
{
return last_input_packet_type.has_value() || static_cast<const ReadBufferFromPocoSocket &>(*in).hasPendingData();
return in_last_packet_type.has_value() || static_cast<const ReadBufferFromPocoSocket &>(*in).hasPendingData();
}
bool Connection::eof() const
{
return !in_last_packet_type.has_value() && in->eof();
}
std::optional<UInt64> Connection::checkPacket(size_t timeout_microseconds)
std::optional<UInt64> Connection::checkPacketType(size_t timeout_microseconds)
{
if (last_input_packet_type.has_value())
return last_input_packet_type;
if (in_last_packet_type.has_value())
return in_last_packet_type;
if (hasReadPendingData() || poll(timeout_microseconds))
{
// LOG_TRACE(log_wrapper.get(), "Receiving packet type");
LOG_TRACE(log_wrapper.get(), "Receiving packet type");
UInt64 packet_type;
readVarUInt(packet_type, *in);
last_input_packet_type.emplace(packet_type);
return last_input_packet_type;
in_last_packet_type.emplace(packet_type);
return in_last_packet_type;
}
return {};
}
UInt64 Connection::receivePacketType()
{
/// Have we already read packet type?
if (!in_last_packet_type.has_value())
{
LOG_TRACE(log_wrapper.get(), "Receiving packet type");
UInt64 packet_type = 0;
readVarUInt(packet_type, *in);
in_last_packet_type.emplace(packet_type);
}
return in_last_packet_type.value();
}
Connection::Packet Connection::receivePacket()
{
try
{
Packet res;
res.type = receivePacketType();
in_last_packet_type.reset();
/// Have we already read packet type?
if (last_input_packet_type)
{
res.type = *last_input_packet_type;
last_input_packet_type.reset();
}
else
{
//LOG_TRACE(log_wrapper.get(), "Receiving packet type");
readVarUInt(res.type, *in);
}
//LOG_TRACE(log_wrapper.get(), "Receiving packet " << res.type << " " << Protocol::Server::toString(res.type));
LOG_TRACE(log_wrapper.get(), "Receiving packet " << res.type << " " << Protocol::Server::toString(res.type));
switch (res.type)
{
@ -564,6 +590,10 @@ Connection::Packet Connection::receivePacket()
res.progress = receiveProgress();
return res;
case Protocol::Server::Pong:
receivePong();
return res;
case Protocol::Server::ProfileInfo:
res.profile_info = receiveProfileInfo();
return res;
@ -701,6 +731,26 @@ Progress Connection::receiveProgress()
}
UInt64 Connection::receivePong()
{
bool receive_id = server_revision >= DBMS_MIN_REVISION_WITH_EXTENDED_PING;
if (receive_id)
{
UInt64 pong_id;
readVarUInt(pong_id, *in);
/// Invalid response
if (pong_id > ping_id_increment)
throw NetException("Unexpected ping-pong response from server", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
return pong_id;
}
return std::numeric_limits<UInt64>::max();
}
BlockStreamProfileInfo Connection::receiveProfileInfo()
{
BlockStreamProfileInfo profile_info;

View File

@ -24,6 +24,8 @@
#include <atomic>
#include <optional>
#include "../IO/Progress.h"
#include "../Common/Exception.h"
namespace DB
@ -138,11 +140,17 @@ public:
/// Check, if has data to read.
bool poll(size_t timeout_microseconds = 0);
/// Check, if has data in read buffer.
/// Check, if has data in read buffer or read packet type.
bool hasReadPendingData() const;
/// in->eof() and there is no read packet type.
bool eof() const;
/// Checks if there is input data in connection and reads packet ID.
std::optional<UInt64> checkPacket(size_t timeout_microseconds = 0);
std::optional<UInt64> checkPacketType(size_t timeout_microseconds = 0);
/// Read packet type or returns already read one.
UInt64 receivePacketType();
/// Receive packet from server.
Packet receivePacket();
@ -198,7 +206,7 @@ private:
std::unique_ptr<Poco::Net::StreamSocket> socket;
std::shared_ptr<ReadBuffer> in;
std::shared_ptr<WriteBuffer> out;
std::optional<UInt64> last_input_packet_type;
std::optional<UInt64> in_last_packet_type;
String query_id;
Protocol::Compression compression; /// Enable data compression for communication.
@ -253,12 +261,16 @@ private:
void receiveHello();
bool ping();
/// Is used to generate ping ids
UInt64 ping_id_increment = 0;
Block receiveData();
Block receiveLogData();
Block receiveDataImpl(BlockInputStreamPtr & stream);
std::unique_ptr<Exception> receiveException();
Progress receiveProgress();
UInt64 receivePong();
BlockStreamProfileInfo receiveProfileInfo();
void initInputBuffers();

View File

@ -37,9 +37,9 @@ class MemoryTracker
const char * description = nullptr;
public:
MemoryTracker(VariableContext level = VariableContext::Thread) : level(level) {}
MemoryTracker(Int64 limit_, VariableContext level = VariableContext::Thread) : limit(limit_), level(level) {}
MemoryTracker(MemoryTracker * parent_, VariableContext level = VariableContext::Thread) : parent(parent_), level(level) {}
explicit MemoryTracker(VariableContext level = VariableContext::Thread) : level(level) {}
explicit MemoryTracker(Int64 limit_, VariableContext level = VariableContext::Thread) : limit(limit_), level(level) {}
explicit MemoryTracker(MemoryTracker * parent_, VariableContext level = VariableContext::Thread) : parent(parent_), level(level) {}
~MemoryTracker();

View File

@ -62,6 +62,7 @@
#define DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE 54337
#define DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME 54372
#define DBMS_MIN_REVISION_WITH_SERVER_LOGS 54386
#define DBMS_MIN_REVISION_WITH_EXTENDED_PING 54386
/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
#define DBMS_TCP_PROTOCOL_VERSION 54226

View File

@ -67,7 +67,7 @@ void RemoteBlockOutputStream::write(const Block & block)
catch (const NetException &)
{
/// Try to get more detailed exception from server
auto packet_type = connection.checkPacket();
auto packet_type = connection.checkPacketType();
if (packet_type && *packet_type == Protocol::Server::Exception)
{
Connection::Packet packet = connection.receivePacket();

View File

@ -15,7 +15,7 @@ rm -f "$server_logs_file"
settings="$server_logs --log_queries=1 --log_query_threads=1 --log_profile_events=1 --log_query_settings=1"
# Test insert logging on each block and checkPacket() method
# Test insert logging on each block and checkPacketType() method
$CLICKHOUSE_CLIENT $settings -n -q "
DROP TABLE IF EXISTS test.null;