From de3791dbdaf4199233a9242e7e7794ec43da7571 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Wed, 28 Mar 2018 17:07:28 +0300 Subject: [PATCH] Accpet client-side send and recieve timeouts. [#CLICKHOUSE-2] --- dbms/src/Client/Connection.cpp | 27 +------------------ dbms/src/Client/TimeoutSetter.h | 46 +++++++++++++++++++++++++++++++++ dbms/src/Server/TCPHandler.cpp | 11 +++++++- dbms/src/Server/TCPHandler.h | 4 +++ 4 files changed, 61 insertions(+), 27 deletions(-) create mode 100644 dbms/src/Client/TimeoutSetter.h diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 0960ac8e77d..8015b708920 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -228,32 +229,6 @@ void Connection::forceConnected() } } -struct TimeoutSetter -{ - TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_) - : socket(socket_), timeout(timeout_) - { - old_send_timeout = socket.getSendTimeout(); - old_receive_timeout = socket.getReceiveTimeout(); - - if (old_send_timeout > timeout) - socket.setSendTimeout(timeout); - if (old_receive_timeout > timeout) - socket.setReceiveTimeout(timeout); - } - - ~TimeoutSetter() - { - socket.setSendTimeout(old_send_timeout); - socket.setReceiveTimeout(old_receive_timeout); - } - - Poco::Net::StreamSocket & socket; - Poco::Timespan timeout; - Poco::Timespan old_send_timeout; - Poco::Timespan old_receive_timeout; -}; - bool Connection::ping() { // LOG_TRACE(log_wrapper.get(), "Ping"); diff --git a/dbms/src/Client/TimeoutSetter.h b/dbms/src/Client/TimeoutSetter.h new file mode 100644 index 00000000000..0e908c1bdac --- /dev/null +++ b/dbms/src/Client/TimeoutSetter.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +/// Temporarily overrides socket send/recieve timeouts and reset them back into destructor +/// Timeouts could be only decreased +struct TimeoutSetter +{ + TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & send_timeout_, const Poco::Timespan & recieve_timeout_) + : socket(socket_), send_timeout(send_timeout_), recieve_timeout(recieve_timeout_) + { + old_send_timeout = socket.getSendTimeout(); + old_receive_timeout = socket.getReceiveTimeout(); + + if (old_send_timeout > send_timeout) + socket.setSendTimeout(send_timeout); + + if (old_receive_timeout > recieve_timeout) + socket.setReceiveTimeout(recieve_timeout); + } + + TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_) + : TimeoutSetter(socket_, timeout_, timeout_) {} + + ~TimeoutSetter() + { + socket.setSendTimeout(old_send_timeout); + socket.setReceiveTimeout(old_receive_timeout); + } + + Poco::Net::StreamSocket & socket; + + Poco::Timespan send_timeout; + Poco::Timespan recieve_timeout; + + Poco::Timespan old_send_timeout; + Poco::Timespan old_receive_timeout; +}; + + +} diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index faf39dded67..425f9974df2 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -31,6 +31,7 @@ #include "TCPHandler.h" #include +#include namespace DB @@ -139,6 +140,9 @@ void TCPHandler::runImpl() /// Restore context of request. query_context = connection_context; + /// If a user passed query-local timeouts, reset socket to initial state at the end of the query + SCOPE_EXIT({state.timeout_setter.reset();}); + /** If Query - process it. If Ping or Cancel - go back to the beginning. * There may come settings for a separate query that modify `query_context`. */ @@ -600,7 +604,12 @@ void TCPHandler::receiveQuery() } /// Per query settings. - query_context.getSettingsRef().deserialize(*in); + Settings & settings = query_context.getSettingsRef(); + settings.deserialize(*in); + + /// Sync timeouts on client and server during current query to avoid dangling queries on server + /// NOTE: these settings are applied only for current connection (not for distributed tables' connections) + state.timeout_setter = std::make_unique(socket(), settings.send_timeout, settings.receive_timeout); readVarUInt(stage, *in); state.stage = QueryProcessingStage::Enum(stage); diff --git a/dbms/src/Server/TCPHandler.h b/dbms/src/Server/TCPHandler.h index f53850ee487..7202067dab8 100644 --- a/dbms/src/Server/TCPHandler.h +++ b/dbms/src/Server/TCPHandler.h @@ -11,6 +11,7 @@ #include #include #include +#include #include "IServer.h" @@ -59,6 +60,9 @@ struct QueryState /// To output progress, the difference after the previous sending of progress. Progress progress; + /// Timeouts setter for current query + std::unique_ptr timeout_setter; + void reset() {