Reduce redundancies

This commit is contained in:
Robert Schulze 2023-02-07 12:10:26 +00:00
parent 783e26d2ba
commit 10af0b3e49
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
10 changed files with 46 additions and 97 deletions

View File

@ -12,10 +12,8 @@ LibraryBridgeHelper::LibraryBridgeHelper(ContextPtr context_)
, http_timeout(context_->getGlobalContext()->getSettingsRef().http_receive_timeout.value)
, bridge_host(config.getString("library_bridge.host", DEFAULT_HOST))
, bridge_port(config.getUInt("library_bridge.port", DEFAULT_PORT))
, http_timeouts(ConnectionTimeouts::getHTTPTimeouts(context_->getSettingsRef(), {context_->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}))
{
const auto & settings = context_->getSettingsRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
http_timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
}

View File

@ -96,14 +96,9 @@ protected:
bool bridgeHandShake() override
{
const auto & settings = getContext()->getSettingsRef();
const auto & cfg = getContext()->getConfigRef();
Poco::Timespan http_keep_alive_timeout{cfg.getUInt("keep_alive_timeout", 10), 0};
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
try
{
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, timeouts, credentials);
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, getHTTPTimeouts(), credentials);
return checkString(PING_OK_ANSWER, buf);
}
catch (...)
@ -166,6 +161,10 @@ private:
Poco::Net::HTTPBasicCredentials credentials{};
ConnectionTimeouts getHTTPTimeouts()
{
return ConnectionTimeouts::getHTTPTimeouts(getContext()->getSettingsRef(), {getContext()->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0});
}
protected:
using URLParams = std::vector<std::pair<std::string, std::string>>;
@ -200,12 +199,7 @@ protected:
uri.addQueryParameter("connection_string", getConnectionString());
uri.addQueryParameter("use_connection_pooling", toString(use_connection_pooling));
const auto & settings = getContext()->getSettingsRef();
const auto & cfg = getContext()->getConfigRef();
Poco::Timespan http_keep_alive_timeout{cfg.getUInt("keep_alive_timeout", 10), 0};
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts, credentials);
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, getHTTPTimeouts(), credentials);
bool res;
readBoolText(res, buf);
@ -227,12 +221,7 @@ protected:
uri.addQueryParameter("connection_string", getConnectionString());
uri.addQueryParameter("use_connection_pooling", toString(use_connection_pooling));
const auto & settings = getContext()->getSettingsRef();
const auto & cfg = getContext()->getConfigRef();
Poco::Timespan http_keep_alive_timeout{cfg.getUInt("keep_alive_timeout", 10), 0};
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts, credentials);
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, getHTTPTimeouts(), credentials);
std::string character;
readStringBinary(character, buf);

View File

@ -52,6 +52,8 @@
/// the number is unmotivated
#define DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT 15
#define DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT 10
#define DBMS_DEFAULT_PATH "/var/lib/clickhouse/"
/// Actually, there may be multiple acquisitions of different locks for a given table within one query.

View File

@ -38,12 +38,8 @@ HTTPDictionarySource::HTTPDictionarySource(
, configuration(configuration_)
, sample_block(sample_block_)
, context(context_)
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}))
{
const auto & settings = context->getSettingsRef();
const auto & config = context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
credentials.setUsername(credentials_.getUsername());
credentials.setPassword(credentials_.getPassword());
}
@ -55,12 +51,8 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
, configuration(other.configuration)
, sample_block(other.sample_block)
, context(Context::createCopy(other.context))
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}))
{
const auto & settings = context->getSettingsRef();
const auto & config = context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
credentials.setUsername(other.credentials.getUsername());
credentials.setPassword(other.credentials.getPassword());
}

View File

@ -76,12 +76,8 @@ XDBCDictionarySource::XDBCDictionarySource(
, load_all_query(query_builder.composeLoadAllQuery())
, bridge_helper(bridge_)
, bridge_url(bridge_helper->getMainURI())
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context_->getSettingsRef(), {context_->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}))
{
const auto & settings = context_->getSettingsRef();
const auto & config = context_->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
auto url_params = bridge_helper->getURLParams(max_block_size);
for (const auto & [name, value] : url_params)
bridge_url.addQueryParameter(name, value);

View File

@ -41,16 +41,14 @@ void WebObjectStorage::initialize(const String & uri_path) const
{
Poco::Net::HTTPBasicCredentials credentials{};
const auto & settings = getContext()->getSettingsRef();
const auto & config = getContext()->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
ReadWriteBufferFromHTTP metadata_buf(
Poco::URI(fs::path(uri_path) / ".index"),
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
timeouts,
ConnectionTimeouts::getHTTPTimeouts(
getContext()->getSettingsRef(),
{getContext()->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}),
credentials,
/* max_redirects= */ 0,
/* buffer_size_= */ DBMS_DEFAULT_BUFFER_SIZE,

View File

@ -232,6 +232,11 @@ zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperAndAssertNotReadonl
return res;
}
static ConnectionTimeouts getHTTPTimeouts(ContextPtr context)
{
return ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0});
}
static MergeTreePartInfo makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(const String & partition_id)
{
/// NOTE We don't have special log entry type for MOVE PARTITION/ATTACH PARTITION FROM,
@ -2355,7 +2360,6 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
return true;
}
void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entry)
{
auto zookeeper = getZooKeeper();
@ -2384,10 +2388,7 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr
auto metadata_snapshot = getInMemoryMetadataPtr();
String source_replica_path = entry.source_shard + "/replicas/" + replica;
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
const auto & settings = getContext()->getSettingsRef();
const auto & config = getContext()->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
auto timeouts = getHTTPTimeouts(getContext());
auto credentials = getContext()->getInterserverCredentials();
String interserver_scheme = getContext()->getInterserverScheme();
@ -3616,11 +3617,7 @@ void StorageReplicatedMergeTree::stopBeingLeader()
ConnectionTimeouts StorageReplicatedMergeTree::getFetchPartHTTPTimeouts(ContextPtr local_context)
{
const auto & sts = local_context->getSettingsRef();
const auto & config = local_context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(sts, http_keep_alive_timeout);
auto timeouts = getHTTPTimeouts(local_context);
auto settings = getSettings();
if (settings->replicated_fetches_http_connection_timeout.changed)
@ -4267,10 +4264,7 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
std::function<MutableDataPartPtr()> get_part;
ReplicatedMergeTreeAddress address(zookeeper->get(fs::path(source_replica_path) / "host"));
const auto & settings = getContext()->getSettingsRef();
const auto & config = getContext()->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
auto timeouts = getHTTPTimeouts(getContext());
auto credentials = getContext()->getInterserverCredentials();
String interserver_scheme = getContext()->getInterserverScheme();

View File

@ -81,6 +81,10 @@ static bool urlWithGlobs(const String & uri)
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) || uri.find('|') != std::string::npos;
}
static ConnectionTimeouts getHTTPTimeouts(ContextPtr context)
{
return ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0});
}
IStorageURLBase::IStorageURLBase(
const String & uri_,
@ -624,11 +628,6 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
if (it == urls_to_check.cend())
return nullptr;
const auto & settings = context->getSettingsRef();
const auto & config = context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
auto buf = StorageURLSource::getFirstAvailableURLReadBuffer(
it,
urls_to_check.cend(),
@ -636,7 +635,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
{},
Poco::Net::HTTPRequest::HTTP_GET,
{},
timeouts,
getHTTPTimeouts(context),
compression_method,
credentials,
headers,
@ -690,11 +689,6 @@ Pipe IStorageURLBase::read(
size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
const auto & settings = local_context->getSettingsRef();
const auto & config = local_context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
if (urlWithGlobs(uri))
{
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
@ -725,7 +719,7 @@ Pipe IStorageURLBase::read(
local_context,
columns_description,
max_block_size,
timeouts,
getHTTPTimeouts(local_context),
compression_method,
download_threads,
headers,
@ -749,7 +743,7 @@ Pipe IStorageURLBase::read(
local_context,
columns_description,
max_block_size,
timeouts,
getHTTPTimeouts(local_context),
compression_method,
max_download_threads,
headers,
@ -785,11 +779,6 @@ Pipe StorageURLWithFailover::read(
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
uri_info->uri_list_to_read.emplace_back(uri_options);
const auto & settings = local_context->getSettingsRef();
const auto & config = local_context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
auto pipe = Pipe(std::make_shared<StorageURLSource>(
uri_info,
getReadMethod(),
@ -801,7 +790,7 @@ Pipe StorageURLWithFailover::read(
local_context,
columns_description,
max_block_size,
timeouts,
getHTTPTimeouts(local_context),
compression_method,
local_context->getSettingsRef().max_download_threads,
headers,
@ -821,11 +810,6 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
bool is_partitioned_implementation = partition_by_ast && has_wildcards;
const auto & settings = context->getSettingsRef();
const auto & config = context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
if (is_partitioned_implementation)
{
return std::make_shared<PartitionedStorageURLSink>(
@ -835,7 +819,7 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
format_settings,
metadata_snapshot->getSampleBlock(),
context,
timeouts,
getHTTPTimeouts(context),
compression_method,
http_method);
}
@ -847,7 +831,7 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
format_settings,
metadata_snapshot->getSampleBlock(),
context,
timeouts,
getHTTPTimeouts(context),
compression_method,
http_method);
}
@ -910,17 +894,13 @@ std::optional<time_t> IStorageURLBase::getLastModificationTime(
{
auto settings = context->getSettingsRef();
const auto & config = context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
try
{
ReadWriteBufferFromHTTP buf(
Poco::URI(url),
Poco::Net::HTTPRequest::HTTP_GET,
{},
timeouts,
getHTTPTimeouts(context),
credentials,
settings.max_http_get_redirects,
settings.max_read_buffer_size,

View File

@ -130,10 +130,6 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /* query */, const StorageMet
request_uri.addQueryParameter("format_name", format_name);
request_uri.addQueryParameter("sample_block", metadata_snapshot->getSampleBlock().getNamesAndTypesList().toString());
const auto & settings = local_context->getSettingsRef();
const auto & config = local_context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
return std::make_shared<StorageURLSink>(
request_uri.toString(),
@ -141,7 +137,9 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /* query */, const StorageMet
getFormatSettings(local_context),
metadata_snapshot->getSampleBlock(),
local_context,
timeouts,
ConnectionTimeouts::getHTTPTimeouts(
local_context->getSettingsRef(),
{local_context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}),
compression_method);
}

View File

@ -75,13 +75,15 @@ ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr contex
bool use_nulls = context->getSettingsRef().external_table_functions_use_nulls;
columns_info_uri.addQueryParameter("external_table_functions_use_nulls", toString(use_nulls));
const auto & settings = context->getSettingsRef();
const auto & config = context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(settings, http_keep_alive_timeout);
Poco::Net::HTTPBasicCredentials credentials{};
ReadWriteBufferFromHTTP buf(columns_info_uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts, credentials);
ReadWriteBufferFromHTTP buf(
columns_info_uri,
Poco::Net::HTTPRequest::HTTP_POST,
{},
ConnectionTimeouts::getHTTPTimeouts(
context->getSettingsRef(),
{context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}),
credentials);
std::string columns_info;
readStringBinary(columns_info, buf);