mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 17:44:23 +00:00
Accpet client-side send and recieve timeouts. [#CLICKHOUSE-2]
This commit is contained in:
parent
701b58b4ff
commit
de3791dbda
@ -12,6 +12,7 @@
|
||||
#include <DataStreams/NativeBlockInputStream.h>
|
||||
#include <DataStreams/NativeBlockOutputStream.h>
|
||||
#include <Client/Connection.h>
|
||||
#include <Client/TimeoutSetter.h>
|
||||
#include <Common/ClickHouseRevision.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/NetException.h>
|
||||
@ -228,32 +229,6 @@ void Connection::forceConnected()
|
||||
}
|
||||
}
|
||||
|
||||
struct TimeoutSetter
|
||||
{
|
||||
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_)
|
||||
: socket(socket_), timeout(timeout_)
|
||||
{
|
||||
old_send_timeout = socket.getSendTimeout();
|
||||
old_receive_timeout = socket.getReceiveTimeout();
|
||||
|
||||
if (old_send_timeout > timeout)
|
||||
socket.setSendTimeout(timeout);
|
||||
if (old_receive_timeout > timeout)
|
||||
socket.setReceiveTimeout(timeout);
|
||||
}
|
||||
|
||||
~TimeoutSetter()
|
||||
{
|
||||
socket.setSendTimeout(old_send_timeout);
|
||||
socket.setReceiveTimeout(old_receive_timeout);
|
||||
}
|
||||
|
||||
Poco::Net::StreamSocket & socket;
|
||||
Poco::Timespan timeout;
|
||||
Poco::Timespan old_send_timeout;
|
||||
Poco::Timespan old_receive_timeout;
|
||||
};
|
||||
|
||||
bool Connection::ping()
|
||||
{
|
||||
// LOG_TRACE(log_wrapper.get(), "Ping");
|
||||
|
46
dbms/src/Client/TimeoutSetter.h
Normal file
46
dbms/src/Client/TimeoutSetter.h
Normal file
@ -0,0 +1,46 @@
|
||||
#pragma once
|
||||
|
||||
#include <Poco/Timespan.h>
|
||||
#include <Poco/Net/StreamSocket.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Temporarily overrides socket send/recieve timeouts and reset them back into destructor
|
||||
/// Timeouts could be only decreased
|
||||
struct TimeoutSetter
|
||||
{
|
||||
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & send_timeout_, const Poco::Timespan & recieve_timeout_)
|
||||
: socket(socket_), send_timeout(send_timeout_), recieve_timeout(recieve_timeout_)
|
||||
{
|
||||
old_send_timeout = socket.getSendTimeout();
|
||||
old_receive_timeout = socket.getReceiveTimeout();
|
||||
|
||||
if (old_send_timeout > send_timeout)
|
||||
socket.setSendTimeout(send_timeout);
|
||||
|
||||
if (old_receive_timeout > recieve_timeout)
|
||||
socket.setReceiveTimeout(recieve_timeout);
|
||||
}
|
||||
|
||||
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_)
|
||||
: TimeoutSetter(socket_, timeout_, timeout_) {}
|
||||
|
||||
~TimeoutSetter()
|
||||
{
|
||||
socket.setSendTimeout(old_send_timeout);
|
||||
socket.setReceiveTimeout(old_receive_timeout);
|
||||
}
|
||||
|
||||
Poco::Net::StreamSocket & socket;
|
||||
|
||||
Poco::Timespan send_timeout;
|
||||
Poco::Timespan recieve_timeout;
|
||||
|
||||
Poco::Timespan old_send_timeout;
|
||||
Poco::Timespan old_receive_timeout;
|
||||
};
|
||||
|
||||
|
||||
}
|
@ -31,6 +31,7 @@
|
||||
#include "TCPHandler.h"
|
||||
|
||||
#include <Common/NetException.h>
|
||||
#include <ext/scope_guard.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -139,6 +140,9 @@ void TCPHandler::runImpl()
|
||||
/// Restore context of request.
|
||||
query_context = connection_context;
|
||||
|
||||
/// If a user passed query-local timeouts, reset socket to initial state at the end of the query
|
||||
SCOPE_EXIT({state.timeout_setter.reset();});
|
||||
|
||||
/** If Query - process it. If Ping or Cancel - go back to the beginning.
|
||||
* There may come settings for a separate query that modify `query_context`.
|
||||
*/
|
||||
@ -600,7 +604,12 @@ void TCPHandler::receiveQuery()
|
||||
}
|
||||
|
||||
/// Per query settings.
|
||||
query_context.getSettingsRef().deserialize(*in);
|
||||
Settings & settings = query_context.getSettingsRef();
|
||||
settings.deserialize(*in);
|
||||
|
||||
/// Sync timeouts on client and server during current query to avoid dangling queries on server
|
||||
/// NOTE: these settings are applied only for current connection (not for distributed tables' connections)
|
||||
state.timeout_setter = std::make_unique<TimeoutSetter>(socket(), settings.send_timeout, settings.receive_timeout);
|
||||
|
||||
readVarUInt(stage, *in);
|
||||
state.stage = QueryProcessingStage::Enum(stage);
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <DataStreams/BlockIO.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Client/TimeoutSetter.h>
|
||||
|
||||
#include "IServer.h"
|
||||
|
||||
@ -59,6 +60,9 @@ struct QueryState
|
||||
/// To output progress, the difference after the previous sending of progress.
|
||||
Progress progress;
|
||||
|
||||
/// Timeouts setter for current query
|
||||
std::unique_ptr<TimeoutSetter> timeout_setter;
|
||||
|
||||
|
||||
void reset()
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user