mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
added http_*_timeout settings [#CLICKHOUSE-3440]
This commit is contained in:
parent
890ad92f90
commit
cbeeb84999
@ -64,9 +64,9 @@ void Connection::connect()
|
||||
{
|
||||
socket = std::make_unique<Poco::Net::StreamSocket>();
|
||||
}
|
||||
socket->connect(resolved_address, connect_timeout);
|
||||
socket->setReceiveTimeout(receive_timeout);
|
||||
socket->setSendTimeout(send_timeout);
|
||||
socket->connect(resolved_address, timeouts.connection_timeout);
|
||||
socket->setReceiveTimeout(timeouts.receive_timeout);
|
||||
socket->setSendTimeout(timeouts.send_timeout);
|
||||
socket->setNoDelay(true);
|
||||
|
||||
in = std::make_shared<ReadBufferFromPocoSocket>(*socket);
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include <DataStreams/BlockStreamProfileInfo.h>
|
||||
|
||||
#include <IO/CompressionSettings.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
|
||||
#include <Interpreters/Settings.h>
|
||||
#include <Interpreters/TablesStatus.h>
|
||||
@ -54,12 +55,10 @@ class Connection : private boost::noncopyable
|
||||
public:
|
||||
Connection(const String & host_, UInt16 port_, const String & default_database_,
|
||||
const String & user_, const String & password_,
|
||||
const ConnectionTimeouts & timeouts_,
|
||||
const String & client_name_ = "client",
|
||||
Protocol::Compression compression_ = Protocol::Compression::Enable,
|
||||
Protocol::Encryption encryption_ = Protocol::Encryption::Disable,
|
||||
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
|
||||
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
|
||||
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0),
|
||||
Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0))
|
||||
:
|
||||
host(host_), port(port_), default_database(default_database_),
|
||||
@ -67,7 +66,7 @@ public:
|
||||
client_name(client_name_),
|
||||
compression(compression_),
|
||||
encryption(encryption_),
|
||||
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_),
|
||||
timeouts(timeouts_),
|
||||
sync_request_timeout(sync_request_timeout_),
|
||||
log_wrapper(*this)
|
||||
{
|
||||
@ -82,12 +81,10 @@ public:
|
||||
Connection(const String & host_, UInt16 port_, const Poco::Net::SocketAddress & resolved_address_,
|
||||
const String & default_database_,
|
||||
const String & user_, const String & password_,
|
||||
const ConnectionTimeouts & timeouts_,
|
||||
const String & client_name_ = "client",
|
||||
Protocol::Compression compression_ = Protocol::Compression::Enable,
|
||||
Protocol::Encryption encryption_ = Protocol::Encryption::Disable,
|
||||
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
|
||||
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
|
||||
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0),
|
||||
Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0))
|
||||
:
|
||||
host(host_), port(port_),
|
||||
@ -97,7 +94,7 @@ public:
|
||||
client_name(client_name_),
|
||||
compression(compression_),
|
||||
encryption(encryption_),
|
||||
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_),
|
||||
timeouts(timeouts_),
|
||||
sync_request_timeout(sync_request_timeout_),
|
||||
log_wrapper(*this)
|
||||
{
|
||||
@ -233,9 +230,7 @@ private:
|
||||
*/
|
||||
ThrottlerPtr throttler;
|
||||
|
||||
Poco::Timespan connect_timeout;
|
||||
Poco::Timespan receive_timeout;
|
||||
Poco::Timespan send_timeout;
|
||||
ConnectionTimeouts timeouts;
|
||||
Poco::Timespan sync_request_timeout;
|
||||
|
||||
/// From where to read query execution result.
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <Common/PoolBase.h>
|
||||
|
||||
#include <Client/Connection.h>
|
||||
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -48,17 +48,15 @@ public:
|
||||
const String & host_, UInt16 port_,
|
||||
const String & default_database_,
|
||||
const String & user_, const String & password_,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
const String & client_name_ = "client",
|
||||
Protocol::Compression compression_ = Protocol::Compression::Enable,
|
||||
Protocol::Encryption encryption_ = Protocol::Encryption::Disable,
|
||||
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
|
||||
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
|
||||
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0))
|
||||
Protocol::Encryption encryption_ = Protocol::Encryption::Disable)
|
||||
: Base(max_connections_, &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
|
||||
host(host_), port(port_), default_database(default_database_),
|
||||
user(user_), password(password_), resolved_address(host_, port_),
|
||||
client_name(client_name_), compression(compression_), encryption(encryption_),
|
||||
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_)
|
||||
timeouts(timeouts)
|
||||
{
|
||||
}
|
||||
|
||||
@ -66,17 +64,15 @@ public:
|
||||
const String & host_, UInt16 port_, const Poco::Net::SocketAddress & resolved_address_,
|
||||
const String & default_database_,
|
||||
const String & user_, const String & password_,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
const String & client_name_ = "client",
|
||||
Protocol::Compression compression_ = Protocol::Compression::Enable,
|
||||
Protocol::Encryption encryption_ = Protocol::Encryption::Disable,
|
||||
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
|
||||
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
|
||||
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0))
|
||||
Protocol::Encryption encryption_ = Protocol::Encryption::Disable)
|
||||
: Base(max_connections_, &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
|
||||
host(host_), port(port_), default_database(default_database_),
|
||||
user(user_), password(password_), resolved_address(resolved_address_),
|
||||
client_name(client_name_), compression(compression_), encryption(encryption_),
|
||||
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_)
|
||||
timeouts(timeouts)
|
||||
{
|
||||
}
|
||||
|
||||
@ -105,9 +101,8 @@ protected:
|
||||
{
|
||||
return std::make_shared<Connection>(
|
||||
host, port, resolved_address,
|
||||
default_database, user, password,
|
||||
client_name, compression, encryption,
|
||||
connect_timeout, receive_timeout, send_timeout);
|
||||
default_database, user, password, timeouts,
|
||||
client_name, compression, encryption);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -126,9 +121,7 @@ private:
|
||||
Protocol::Compression compression; /// Whether to compress data when interacting with the server.
|
||||
Protocol::Encryption encryption; /// Whether to encrypt data when interacting with the server.
|
||||
|
||||
Poco::Timespan connect_timeout;
|
||||
Poco::Timespan receive_timeout;
|
||||
Poco::Timespan send_timeout;
|
||||
ConnectionTimeouts timeouts;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -71,6 +71,9 @@
|
||||
|
||||
#define DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS 7500
|
||||
|
||||
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
|
||||
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
|
||||
|
||||
#define ALWAYS_INLINE __attribute__((__always_inline__))
|
||||
#define NO_INLINE __attribute__((__noinline__))
|
||||
|
||||
|
@ -7,7 +7,7 @@
|
||||
#include <Common/isLocalAddress.h>
|
||||
#include <memory>
|
||||
#include <ext/range.h>
|
||||
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -21,11 +21,13 @@ namespace ErrorCodes
|
||||
static const size_t MAX_CONNECTIONS = 16;
|
||||
|
||||
static ConnectionPoolWithFailoverPtr createPool(
|
||||
const std::string & host, UInt16 port, const std::string & db, const std::string & user, const std::string & password)
|
||||
const std::string & host, UInt16 port, const std::string & db,
|
||||
const std::string & user, const std::string & password, const Context & context)
|
||||
{
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeouts(context.getSettingsRef());
|
||||
ConnectionPoolPtrs pools;
|
||||
pools.emplace_back(std::make_shared<ConnectionPool>(
|
||||
MAX_CONNECTIONS, host, port, db, user, password, "ClickHouseDictionarySource"));
|
||||
MAX_CONNECTIONS, host, port, db, user, password, timeouts, "ClickHouseDictionarySource"));
|
||||
return std::make_shared<ConnectionPoolWithFailover>(pools, LoadBalancing::RANDOM);
|
||||
}
|
||||
|
||||
@ -46,7 +48,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
|
||||
query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks},
|
||||
sample_block{sample_block}, context(context),
|
||||
is_local{isLocalAddress({ host, port }, config.getInt("tcp_port", 0))},
|
||||
pool{is_local ? nullptr : createPool(host, port, db, user, password)},
|
||||
pool{is_local ? nullptr : createPool(host, port, db, user, password, context)},
|
||||
load_all_query{query_builder.composeLoadAllQuery()}
|
||||
{}
|
||||
|
||||
@ -59,7 +61,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar
|
||||
query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks},
|
||||
sample_block{other.sample_block}, context(other.context),
|
||||
is_local{other.is_local},
|
||||
pool{is_local ? nullptr : createPool(host, port, db, user, password)},
|
||||
pool{is_local ? nullptr : createPool(host, port, db, user, password, context)},
|
||||
load_all_query{other.load_all_query}
|
||||
{}
|
||||
|
||||
|
@ -8,7 +8,7 @@
|
||||
#include <IO/WriteBufferFromOStream.h>
|
||||
#include <Dictionaries/DictionarySourceHelpers.h>
|
||||
#include <common/logger_useful.h>
|
||||
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -24,7 +24,8 @@ HTTPDictionarySource::HTTPDictionarySource(const DictionaryStructure & dict_stru
|
||||
url{config.getString(config_prefix + ".url", "")},
|
||||
format{config.getString(config_prefix + ".format")},
|
||||
sample_block{sample_block},
|
||||
context(context)
|
||||
context(context),
|
||||
timeouts(ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()))
|
||||
{
|
||||
}
|
||||
|
||||
@ -34,7 +35,8 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
|
||||
url{other.url},
|
||||
format{other.format},
|
||||
sample_block{other.sample_block},
|
||||
context(other.context)
|
||||
context(other.context),
|
||||
timeouts(ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()))
|
||||
{
|
||||
}
|
||||
|
||||
@ -42,7 +44,8 @@ BlockInputStreamPtr HTTPDictionarySource::loadAll()
|
||||
{
|
||||
LOG_TRACE(log, "loadAll " + toString());
|
||||
Poco::URI uri(url);
|
||||
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_GET);
|
||||
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_GET,
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts);
|
||||
auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
|
||||
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
|
||||
}
|
||||
@ -59,7 +62,8 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & id
|
||||
};
|
||||
|
||||
Poco::URI uri(url);
|
||||
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback);
|
||||
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST,
|
||||
out_stream_callback, timeouts);
|
||||
auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
|
||||
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
|
||||
}
|
||||
@ -77,7 +81,8 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(
|
||||
};
|
||||
|
||||
Poco::URI uri(url);
|
||||
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback);
|
||||
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST,
|
||||
out_stream_callback, timeouts);
|
||||
auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
|
||||
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
|
||||
}
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <Dictionaries/IDictionarySource.h>
|
||||
#include <Dictionaries/DictionaryStructure.h>
|
||||
#include <common/LocalDateTime.h>
|
||||
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
|
||||
namespace Poco { class Logger; }
|
||||
|
||||
@ -48,6 +48,7 @@ private:
|
||||
const std::string format;
|
||||
Block sample_block;
|
||||
const Context & context;
|
||||
ConnectionTimeouts timeouts;
|
||||
};
|
||||
|
||||
}
|
||||
|
52
dbms/src/IO/ConnectionTimeouts.h
Normal file
52
dbms/src/IO/ConnectionTimeouts.h
Normal file
@ -0,0 +1,52 @@
|
||||
#pragma once
|
||||
|
||||
#include <Poco/Timespan.h>
|
||||
#include <Interpreters/Settings.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct ConnectionTimeouts
|
||||
{
|
||||
Poco::Timespan connection_timeout;
|
||||
Poco::Timespan send_timeout;
|
||||
Poco::Timespan receive_timeout;
|
||||
|
||||
ConnectionTimeouts() = default;
|
||||
|
||||
ConnectionTimeouts(const Poco::Timespan & connection_timeout_,
|
||||
const Poco::Timespan & send_timeout_,
|
||||
const Poco::Timespan & receive_timeout_)
|
||||
: connection_timeout(connection_timeout_),
|
||||
send_timeout(send_timeout_),
|
||||
receive_timeout(receive_timeout_)
|
||||
{
|
||||
}
|
||||
|
||||
static Poco::Timespan saturate(const Poco::Timespan & timespan, const Poco::Timespan & limit)
|
||||
{
|
||||
if (limit.totalMicroseconds() == 0)
|
||||
return timespan;
|
||||
else
|
||||
return (timespan > limit) ? limit : timespan;
|
||||
}
|
||||
|
||||
ConnectionTimeouts getSaturated(const Poco::Timespan & limit) const
|
||||
{
|
||||
return ConnectionTimeouts(saturate(connection_timeout, limit),
|
||||
saturate(send_timeout, limit),
|
||||
saturate(receive_timeout, limit));
|
||||
}
|
||||
|
||||
static ConnectionTimeouts getTCPTimeouts(const Settings & settings)
|
||||
{
|
||||
return ConnectionTimeouts(settings.connect_timeout, settings.send_timeout, settings.receive_timeout);
|
||||
}
|
||||
|
||||
static ConnectionTimeouts getHTTPTimeouts(const Settings & settings)
|
||||
{
|
||||
return ConnectionTimeouts(settings.http_connection_timeout, settings.http_send_timeout, settings.http_receive_timeout);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
@ -27,8 +27,8 @@ namespace ErrorCodes
|
||||
ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri,
|
||||
const std::string & method_,
|
||||
OutStreamCallback out_stream_callback,
|
||||
size_t buffer_size_,
|
||||
const HTTPTimeouts & timeouts)
|
||||
const ConnectionTimeouts & timeouts,
|
||||
size_t buffer_size_)
|
||||
: ReadBuffer(nullptr, 0),
|
||||
uri{uri},
|
||||
method{!method_.empty() ? method_ : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET},
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Poco/Net/HTTPClientSession.h>
|
||||
#include <Poco/URI.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
|
||||
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
|
||||
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
|
||||
@ -13,13 +14,6 @@ namespace DB
|
||||
|
||||
const int HTTP_TOO_MANY_REQUESTS = 429;
|
||||
|
||||
struct HTTPTimeouts
|
||||
{
|
||||
Poco::Timespan connection_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, 0);
|
||||
Poco::Timespan send_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0);
|
||||
Poco::Timespan receive_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0);
|
||||
};
|
||||
|
||||
|
||||
/** Perform HTTP POST request and provide response to read.
|
||||
*/
|
||||
@ -28,7 +22,7 @@ class ReadWriteBufferFromHTTP : public ReadBuffer
|
||||
private:
|
||||
Poco::URI uri;
|
||||
std::string method;
|
||||
HTTPTimeouts timeouts;
|
||||
ConnectionTimeouts timeouts;
|
||||
|
||||
bool is_ssl;
|
||||
std::unique_ptr<Poco::Net::HTTPClientSession> session;
|
||||
@ -38,12 +32,12 @@ private:
|
||||
public:
|
||||
using OutStreamCallback = std::function<void(std::ostream &)>;
|
||||
|
||||
ReadWriteBufferFromHTTP(
|
||||
explicit ReadWriteBufferFromHTTP(
|
||||
const Poco::URI & uri,
|
||||
const std::string & method = {},
|
||||
OutStreamCallback out_stream_callback = {},
|
||||
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
const HTTPTimeouts & timeouts = {});
|
||||
const ConnectionTimeouts & timeouts = {},
|
||||
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
|
||||
|
||||
bool nextImpl() override;
|
||||
};
|
||||
|
@ -37,9 +37,12 @@ public:
|
||||
{
|
||||
std::make_pair("action", "read"),
|
||||
std::make_pair("path", path),
|
||||
std::make_pair("compress", (compress ? "true" : "false"))});
|
||||
std::make_pair("compress", (compress ? "true" : "false"))
|
||||
});
|
||||
|
||||
impl = std::make_unique<ReadWriteBufferFromHTTP>(uri, std::string(), ReadWriteBufferFromHTTP::OutStreamCallback(), buffer_size, HTTPTimeouts{connection_timeout, send_timeout, receive_timeout});
|
||||
ConnectionTimeouts timeouts(connection_timeout, send_timeout, receive_timeout);
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback callback;
|
||||
impl = std::make_unique<ReadWriteBufferFromHTTP>(uri, std::string(), callback, timeouts, buffer_size);
|
||||
}
|
||||
|
||||
bool nextImpl() override
|
||||
@ -56,7 +59,7 @@ public:
|
||||
const std::string & host,
|
||||
int port,
|
||||
const std::string & path,
|
||||
size_t timeout = 0)
|
||||
const ConnectionTimeouts & timeouts)
|
||||
{
|
||||
Poco::URI uri;
|
||||
uri.setScheme("http");
|
||||
@ -67,7 +70,7 @@ public:
|
||||
std::make_pair("action", "list"),
|
||||
std::make_pair("path", path)});
|
||||
|
||||
ReadWriteBufferFromHTTP in(uri, {}, {}, {}, HTTPTimeouts{timeout});
|
||||
ReadWriteBufferFromHTTP in(uri, {}, {}, timeouts);
|
||||
|
||||
std::vector<std::string> files;
|
||||
while (!in.eof())
|
||||
|
@ -210,10 +210,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
|
||||
settings.distributed_connections_pool_size,
|
||||
address.host_name, address.port, address.resolved_address,
|
||||
address.default_database, address.user, address.password,
|
||||
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable,
|
||||
saturate(settings.connect_timeout, settings.limits.max_execution_time),
|
||||
saturate(settings.receive_timeout, settings.limits.max_execution_time),
|
||||
saturate(settings.send_timeout, settings.limits.max_execution_time)));
|
||||
ConnectionTimeouts::getTCPTimeouts(settings).getSaturated(settings.limits.max_execution_time),
|
||||
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable));
|
||||
|
||||
info.pool = std::make_shared<ConnectionPoolWithFailover>(
|
||||
std::move(pools), settings.load_balancing, settings.connections_with_failover_max_tries);
|
||||
@ -289,10 +287,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
|
||||
settings.distributed_connections_pool_size,
|
||||
replica.host_name, replica.port, replica.resolved_address,
|
||||
replica.default_database, replica.user, replica.password,
|
||||
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable,
|
||||
saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time),
|
||||
saturate(settings.receive_timeout, settings.limits.max_execution_time),
|
||||
saturate(settings.send_timeout, settings.limits.max_execution_time)));
|
||||
ConnectionTimeouts::getTCPTimeouts(settings).getSaturated(settings.limits.max_execution_time),
|
||||
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable));
|
||||
}
|
||||
}
|
||||
|
||||
@ -348,10 +344,8 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
|
||||
settings.distributed_connections_pool_size,
|
||||
replica.host_name, replica.port, replica.resolved_address,
|
||||
replica.default_database, replica.user, replica.password,
|
||||
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable,
|
||||
saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time),
|
||||
saturate(settings.receive_timeout, settings.limits.max_execution_time),
|
||||
saturate(settings.send_timeout, settings.limits.max_execution_time)));
|
||||
ConnectionTimeouts::getHTTPTimeouts(settings).getSaturated(settings.limits.max_execution_time),
|
||||
"server", Protocol::Compression::Enable, Protocol::Encryption::Disable));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,8 @@ BlockInputStreams executeQuery(
|
||||
const std::string query = queryToString(query_ast);
|
||||
|
||||
Settings new_settings = settings;
|
||||
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.limits.max_execution_time);
|
||||
new_settings.queue_max_wait_ms = ConnectionTimeouts::saturate(new_settings.queue_max_wait_ms,
|
||||
settings.limits.max_execution_time);
|
||||
|
||||
/// Does not matter on remote servers, because queries are sent under different user.
|
||||
new_settings.max_concurrent_queries_for_user = 0;
|
||||
|
@ -300,7 +300,11 @@ struct Settings
|
||||
/* Timeout for flushing data from streaming storages. */ \
|
||||
M(SettingMilliseconds, stream_flush_interval_ms, DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS, "Timeout for flushing data from streaming storages.") \
|
||||
/* Schema identifier (used by schema-based formats) */ \
|
||||
M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)")
|
||||
M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") \
|
||||
\
|
||||
M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \
|
||||
M(SettingSeconds, http_send_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP send timeout") \
|
||||
M(SettingSeconds, http_receive_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP receive timeout") \
|
||||
|
||||
|
||||
/// Possible limits for query execution.
|
||||
|
@ -32,6 +32,7 @@
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
|
||||
#include <DataStreams/RemoteBlockInputStream.h>
|
||||
|
||||
@ -64,10 +65,10 @@ public:
|
||||
const String & host_, UInt16 port_, const String & default_database_,
|
||||
const String & user_, const String & password_, const String & stage,
|
||||
bool randomize_, size_t max_iterations_, double max_time_,
|
||||
const String & json_path_, const Settings & settings_)
|
||||
const String & json_path_, const ConnectionTimeouts & timeouts, const Settings & settings_)
|
||||
:
|
||||
concurrency(concurrency_), delay(delay_), queue(concurrency),
|
||||
connections(concurrency, host_, port_, default_database_, user_, password_),
|
||||
connections(concurrency, host_, port_, default_database_, user_, password_, timeouts),
|
||||
randomize(randomize_), max_iterations(max_iterations_), max_time(max_time_),
|
||||
json_path(json_path_), settings(settings_), global_context(Context::createGlobal()), pool(concurrency)
|
||||
{
|
||||
@ -482,6 +483,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
|
||||
options["iterations"].as<size_t>(),
|
||||
options["timelimit"].as<double>(),
|
||||
options["json"].as<std::string>(),
|
||||
ConnectionTimeouts::getTCPTimeouts(settings),
|
||||
settings);
|
||||
}
|
||||
catch (...)
|
||||
|
@ -385,7 +385,9 @@ private:
|
||||
: Protocol::Encryption::Disable;
|
||||
|
||||
String host = config().getString("host", "localhost");
|
||||
UInt16 port = config().getInt("port", config().getInt(static_cast<bool>(encryption) ? "tcp_ssl_port" : "tcp_port", static_cast<bool>(encryption) ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
|
||||
UInt16 port = config().getInt("port", config().getInt(
|
||||
static_cast<bool>(encryption) ? "tcp_ssl_port" : "tcp_port",
|
||||
static_cast<bool>(encryption) ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
|
||||
String default_database = config().getString("database", "");
|
||||
String user = config().getString("user", "");
|
||||
String password = config().getString("password", "");
|
||||
@ -401,11 +403,12 @@ private:
|
||||
<< (!user.empty() ? " as user " + user : "")
|
||||
<< "." << std::endl;
|
||||
|
||||
connection = std::make_unique<Connection>(host, port, default_database, user, password, "client", compression,
|
||||
encryption,
|
||||
ConnectionTimeouts timeouts(
|
||||
Poco::Timespan(config().getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0),
|
||||
Poco::Timespan(config().getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0),
|
||||
Poco::Timespan(config().getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
|
||||
connection = std::make_unique<Connection>(host, port, default_database, user, password, timeouts, "client",
|
||||
compression, encryption);
|
||||
|
||||
String server_name;
|
||||
UInt64 server_version_major = 0;
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
#include <Interpreters/Settings.h>
|
||||
#include <common/ThreadPool.h>
|
||||
#include <common/getMemoryAmount.h>
|
||||
@ -503,8 +504,9 @@ public:
|
||||
Strings && tests_names_,
|
||||
Strings && skip_names_,
|
||||
Strings && tests_names_regexp_,
|
||||
Strings && skip_names_regexp_)
|
||||
: connection(host_, port_, default_database_, user_, password_),
|
||||
Strings && skip_names_regexp_,
|
||||
const ConnectionTimeouts & timeouts)
|
||||
: connection(host_, port_, default_database_, user_, password_, timeouts),
|
||||
gotSIGINT(false),
|
||||
lite_output(lite_output_),
|
||||
profiles_file(profiles_file_),
|
||||
@ -1481,6 +1483,8 @@ try
|
||||
Strings tests_names_regexp = options.count("names-regexp") ? options["names-regexp"].as<Strings>() : Strings({});
|
||||
Strings skip_names_regexp = options.count("skip-names-regexp") ? options["skip-names-regexp"].as<Strings>() : Strings({});
|
||||
|
||||
auto timeouts = DB::ConnectionTimeouts::getTCPTimeouts(DB::Settings());
|
||||
|
||||
DB::PerformanceTest performanceTest(options["host"].as<String>(),
|
||||
options["port"].as<UInt16>(),
|
||||
options["database"].as<String>(),
|
||||
@ -1494,7 +1498,8 @@ try
|
||||
std::move(tests_names),
|
||||
std::move(skip_names),
|
||||
std::move(tests_names_regexp),
|
||||
std::move(skip_names_regexp));
|
||||
std::move(skip_names_regexp),
|
||||
timeouts);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -357,8 +357,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
{
|
||||
Poco::Net::SocketAddress http_socket_address = make_socket_address(listen_host, config().getInt("http_port"));
|
||||
Poco::Net::ServerSocket http_socket(http_socket_address);
|
||||
http_socket.setReceiveTimeout(settings.receive_timeout);
|
||||
http_socket.setSendTimeout(settings.send_timeout);
|
||||
http_socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
http_socket.setSendTimeout(settings.http_send_timeout);
|
||||
|
||||
servers.emplace_back(new Poco::Net::HTTPServer(
|
||||
new HTTPHandlerFactory(*this, "HTTPHandler-factory"),
|
||||
@ -376,8 +376,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
std::call_once(ssl_init_once, SSLInit);
|
||||
Poco::Net::SocketAddress http_socket_address = make_socket_address(listen_host, config().getInt("https_port"));
|
||||
Poco::Net::SecureServerSocket http_socket(http_socket_address);
|
||||
http_socket.setReceiveTimeout(settings.receive_timeout);
|
||||
http_socket.setSendTimeout(settings.send_timeout);
|
||||
http_socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
http_socket.setSendTimeout(settings.http_send_timeout);
|
||||
|
||||
servers.emplace_back(new Poco::Net::HTTPServer(
|
||||
new HTTPHandlerFactory(*this, "HTTPHandler-factory"),
|
||||
@ -438,8 +438,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
{
|
||||
Poco::Net::SocketAddress interserver_address = make_socket_address(listen_host, config().getInt("interserver_http_port"));
|
||||
Poco::Net::ServerSocket interserver_io_http_socket(interserver_address);
|
||||
interserver_io_http_socket.setReceiveTimeout(settings.receive_timeout);
|
||||
interserver_io_http_socket.setSendTimeout(settings.send_timeout);
|
||||
interserver_io_http_socket.setReceiveTimeout(settings.http_receive_timeout);
|
||||
interserver_io_http_socket.setSendTimeout(settings.http_send_timeout);
|
||||
servers.emplace_back(new Poco::Net::HTTPServer(
|
||||
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"),
|
||||
server_pool,
|
||||
|
@ -150,13 +150,14 @@ void StorageDistributedDirectoryMonitor::run()
|
||||
|
||||
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage)
|
||||
{
|
||||
const auto pool_factory = [&storage, &name] (const std::string & host, const UInt16 port,
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeouts(storage.context.getSettingsRef());
|
||||
const auto pool_factory = [&storage, &name, &timeouts] (const std::string & host, const UInt16 port,
|
||||
const std::string & user, const std::string & password,
|
||||
const std::string & default_database)
|
||||
{
|
||||
return std::make_shared<ConnectionPool>(
|
||||
1, host, port, default_database,
|
||||
user, password,
|
||||
user, password, timeouts,
|
||||
storage.getName() + '_' + name);
|
||||
};
|
||||
|
||||
|
@ -155,6 +155,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
const String & replica_path,
|
||||
const String & host,
|
||||
int port,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
bool to_detached)
|
||||
{
|
||||
Poco::URI uri;
|
||||
@ -168,7 +169,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
{"compress", "false"}
|
||||
});
|
||||
|
||||
ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST};
|
||||
ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts};
|
||||
|
||||
static const String TMP_PREFIX = "tmp_fetch_";
|
||||
String relative_part_path = String(to_detached ? "detached/" : "") + TMP_PREFIX + part_name;
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <IO/HashingWriteBuffer.h>
|
||||
#include <IO/copyData.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -51,6 +52,7 @@ public:
|
||||
const String & replica_path,
|
||||
const String & host,
|
||||
int port,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
bool to_detached = false);
|
||||
|
||||
/// You need to stop the data transfer.
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
|
||||
#include <Interpreters/InterpreterAlterQuery.h>
|
||||
#include <Interpreters/PartLog.h>
|
||||
@ -2124,8 +2125,9 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
|
||||
|
||||
Stopwatch stopwatch;
|
||||
|
||||
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef());
|
||||
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(
|
||||
part_name, replica_path, address.host, address.replication_port, to_detached);
|
||||
part_name, replica_path, address.host, address.replication_port, timeouts, to_detached);
|
||||
|
||||
if (!to_detached)
|
||||
{
|
||||
@ -3094,11 +3096,12 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query
|
||||
|
||||
/// NOTE Works only if there is access from the default user without a password. You can fix it by adding a parameter to the server config.
|
||||
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeouts(context.getSettingsRef());
|
||||
Connection connection(
|
||||
leader_address.host,
|
||||
leader_address.queries_port,
|
||||
leader_address.database,
|
||||
"", "", "ClickHouse replica");
|
||||
"", "", timeouts, "ClickHouse replica");
|
||||
|
||||
RemoteBlockInputStream stream(connection, formattedAST(new_query), context, &settings);
|
||||
NullBlockOutputStream output;
|
||||
|
Loading…
Reference in New Issue
Block a user