Better timeouts handling. [#CLICKHOUSE-2]

This commit is contained in:
Vitaliy Lyudvichenko 2018-04-04 15:11:38 +03:00 committed by alexey-milovidov
parent f5ec675f4a
commit ce8de108ed
5 changed files with 56 additions and 14 deletions

View File

@ -167,6 +167,8 @@ if (Poco_SQLODBC_FOUND)
target_link_libraries (dbms ${Poco_SQLODBC_LIBRARY} ${Poco_SQL_LIBRARY})
target_include_directories (dbms PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_SQLODBC_INCLUDE_DIRS} PUBLIC ${Poco_SQL_INCLUDE_DIRS})
endif()
target_include_directories (clickhouse_common_io PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/poco/Data/include)
target_include_directories (dbms PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/poco/Data/include)
if (Poco_DataODBC_FOUND)
target_link_libraries (clickhouse_common_io ${Poco_Data_LIBRARY})

View File

@ -234,7 +234,7 @@ bool Connection::ping()
{
// LOG_TRACE(log_wrapper.get(), "Ping");
TimeoutSetter timeout_setter(*socket, sync_request_timeout);
TimeoutSetter timeout_setter(*socket, sync_request_timeout, true);
try
{
UInt64 pong = 0;
@ -274,7 +274,7 @@ TablesStatusResponse Connection::getTablesStatus(const TablesStatusRequest & req
if (!connected)
connect();
TimeoutSetter timeout_setter(*socket, sync_request_timeout);
TimeoutSetter timeout_setter(*socket, sync_request_timeout, true);
writeVarUInt(Protocol::Client::TablesStatusRequest, *out);
request.write(*out, server_revision);

View File

@ -11,21 +11,22 @@ namespace DB
/// Timeouts could be only decreased
struct TimeoutSetter
{
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & send_timeout_, const Poco::Timespan & recieve_timeout_)
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & send_timeout_, const Poco::Timespan & recieve_timeout_,
bool limit_max_timeout = false)
: socket(socket_), send_timeout(send_timeout_), recieve_timeout(recieve_timeout_)
{
old_send_timeout = socket.getSendTimeout();
old_receive_timeout = socket.getReceiveTimeout();
if (old_send_timeout > send_timeout)
if (!limit_max_timeout || old_send_timeout > send_timeout)
socket.setSendTimeout(send_timeout);
if (old_receive_timeout > recieve_timeout)
if (!limit_max_timeout || old_receive_timeout > recieve_timeout)
socket.setReceiveTimeout(recieve_timeout);
}
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_)
: TimeoutSetter(socket_, timeout_, timeout_) {}
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_, bool limit_max_timeout = false)
: TimeoutSetter(socket_, timeout_, timeout_, limit_max_timeout) {}
~TimeoutSetter()
{

View File

@ -47,7 +47,27 @@ RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, const
void RemoteBlockOutputStream::write(const Block & block)
{
assertBlocksHaveEqualStructure(block, header, "RemoteBlockOutputStream");
connection.sendData(block);
try
{
connection.sendData(block);
}
catch (const NetException & e)
{
/// Try to get more detailed exception from server
if (connection.poll(0))
{
Connection::Packet packet = connection.receivePacket();
if (Protocol::Server::Exception == packet.type)
{
packet.exception->rethrow();
return;
}
}
throw;
}
}

View File

@ -134,6 +134,7 @@ void TCPHandler::runImpl()
* The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.
*/
std::unique_ptr<Exception> exception;
bool network_error = false;
try
{
@ -183,6 +184,10 @@ void TCPHandler::runImpl()
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
throw;
/// If a timeout occurred, try to inform client about it and close the session
if (e.code() == ErrorCodes::SOCKET_TIMEOUT)
network_error = true;
}
catch (const Poco::Net::NetException & e)
{
@ -211,8 +216,6 @@ void TCPHandler::runImpl()
exception = std::make_unique<Exception>("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
bool network_error = false;
try
{
if (exception)
@ -251,6 +254,14 @@ void TCPHandler::runImpl()
void TCPHandler::readData(const Settings & global_settings)
{
auto receive_timeout = query_context.getSettingsRef().receive_timeout.value;
/// Poll interval should not be greater than receive_timeout
size_t default_poll_interval = global_settings.poll_interval.value * 1000000;
size_t current_poll_interval = static_cast<size_t>(receive_timeout.totalMicroseconds());
constexpr size_t min_poll_interval = 5000; // 5 ms
size_t poll_interval = std::max(min_poll_interval, std::min(default_poll_interval, current_poll_interval));
while (1)
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
@ -258,7 +269,7 @@ void TCPHandler::readData(const Settings & global_settings)
/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
while (1)
{
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(global_settings.poll_interval * 1000000))
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(poll_interval))
break;
/// Do we need to shut down?
@ -269,8 +280,16 @@ void TCPHandler::readData(const Settings & global_settings)
* If we periodically poll, the receive_timeout of the socket itself does not work.
* Therefore, an additional check is added.
*/
if (watch.elapsedSeconds() > global_settings.receive_timeout.totalSeconds())
throw Exception("Timeout exceeded while receiving data from client", ErrorCodes::SOCKET_TIMEOUT);
double elapsed = watch.elapsedSeconds();
if (elapsed > receive_timeout.totalSeconds())
{
std::stringstream ss;
ss << "Timeout exceeded while receiving data from client.";
ss << " Waited for " << static_cast<size_t>(elapsed) << " seconds,";
ss << " timeout is " << receive_timeout.totalSeconds() << " seconds.";
throw Exception(ss.str(), ErrorCodes::SOCKET_TIMEOUT);
}
}
/// If client disconnected.
@ -560,7 +579,7 @@ bool TCPHandler::receivePacket()
return false;
default:
throw Exception("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
throw Exception("Unknown packet " + toString(packet_type) + " from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
}
}