mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 18:12:02 +00:00
Minor changes (#54171)
This commit is contained in:
parent
776f232ec0
commit
b3319f7908
@ -7,8 +7,6 @@
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -143,13 +141,9 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<ProxyConfigurationResolver> ProxyConfigurationResolverProvider::get(Protocol protocol)
|
||||
std::shared_ptr<ProxyConfigurationResolver> ProxyConfigurationResolverProvider::get(Protocol protocol, const Poco::Util::AbstractConfiguration & configuration)
|
||||
{
|
||||
auto context = Context::getGlobalContextInstance();
|
||||
|
||||
chassert(context);
|
||||
|
||||
if (auto resolver = getFromSettings(protocol, "", context->getConfigRef()))
|
||||
if (auto resolver = getFromSettings(protocol, "", configuration))
|
||||
{
|
||||
return resolver;
|
||||
}
|
||||
@ -202,7 +196,7 @@ std::shared_ptr<ProxyConfigurationResolver> ProxyConfigurationResolverProvider::
|
||||
* In case the combination of config_prefix and configuration does not provide a resolver, try to get it from general / new settings.
|
||||
* Falls back to Environment resolver if no configuration is found.
|
||||
* */
|
||||
return ProxyConfigurationResolverProvider::get(Protocol::ANY);
|
||||
return ProxyConfigurationResolverProvider::get(Protocol::ANY, configuration);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -18,7 +18,9 @@ public:
|
||||
* Returns appropriate ProxyConfigurationResolver based on current CH settings (Remote resolver or List resolver).
|
||||
* If no configuration is found, returns Environment Resolver.
|
||||
* */
|
||||
static std::shared_ptr<ProxyConfigurationResolver> get(Protocol protocol);
|
||||
static std::shared_ptr<ProxyConfigurationResolver> get(
|
||||
Protocol protocol,
|
||||
const Poco::Util::AbstractConfiguration & configuration);
|
||||
|
||||
/*
|
||||
* This API exists exclusively for backward compatibility with old S3 storage specific proxy configuration.
|
||||
|
@ -32,9 +32,10 @@ Poco::URI https_list_proxy_server = Poco::URI("http://https_list_proxy:3128");
|
||||
TEST_F(ProxyConfigurationResolverProviderTests, EnvironmentResolverShouldBeUsedIfNoSettings)
|
||||
{
|
||||
EnvironmentProxySetter setter(http_env_proxy_server, https_env_proxy_server);
|
||||
const auto & config = getContext().context->getConfigRef();
|
||||
|
||||
auto http_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve();
|
||||
auto https_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve();
|
||||
auto http_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP, config)->resolve();
|
||||
auto https_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS, config)->resolve();
|
||||
|
||||
ASSERT_EQ(http_configuration.host, http_env_proxy_server.getHost());
|
||||
ASSERT_EQ(http_configuration.port, http_env_proxy_server.getPort());
|
||||
@ -54,13 +55,13 @@ TEST_F(ProxyConfigurationResolverProviderTests, ListHTTPOnly)
|
||||
config->setString("proxy.http.uri", http_list_proxy_server.toString());
|
||||
context->setConfig(config);
|
||||
|
||||
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve();
|
||||
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP, *config)->resolve();
|
||||
|
||||
ASSERT_EQ(http_proxy_configuration.host, http_list_proxy_server.getHost());
|
||||
ASSERT_EQ(http_proxy_configuration.port, http_list_proxy_server.getPort());
|
||||
ASSERT_EQ(http_proxy_configuration.protocol, DB::ProxyConfiguration::protocolFromString(http_list_proxy_server.getScheme()));
|
||||
|
||||
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve();
|
||||
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS, *config)->resolve();
|
||||
|
||||
// No https configuration since it's not set
|
||||
ASSERT_EQ(https_proxy_configuration.host, "");
|
||||
@ -76,12 +77,12 @@ TEST_F(ProxyConfigurationResolverProviderTests, ListHTTPSOnly)
|
||||
config->setString("proxy.https.uri", https_list_proxy_server.toString());
|
||||
context->setConfig(config);
|
||||
|
||||
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve();
|
||||
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP, *config)->resolve();
|
||||
|
||||
ASSERT_EQ(http_proxy_configuration.host, "");
|
||||
ASSERT_EQ(http_proxy_configuration.port, 0);
|
||||
|
||||
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve();
|
||||
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS, *config)->resolve();
|
||||
|
||||
ASSERT_EQ(https_proxy_configuration.host, https_list_proxy_server.getHost());
|
||||
|
||||
@ -104,13 +105,13 @@ TEST_F(ProxyConfigurationResolverProviderTests, ListBoth)
|
||||
|
||||
context->setConfig(config);
|
||||
|
||||
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve();
|
||||
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP, *config)->resolve();
|
||||
|
||||
ASSERT_EQ(http_proxy_configuration.host, http_list_proxy_server.getHost());
|
||||
ASSERT_EQ(http_proxy_configuration.protocol, DB::ProxyConfiguration::protocolFromString(http_list_proxy_server.getScheme()));
|
||||
ASSERT_EQ(http_proxy_configuration.port, http_list_proxy_server.getPort());
|
||||
|
||||
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve();
|
||||
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS, *config)->resolve();
|
||||
|
||||
ASSERT_EQ(https_proxy_configuration.host, https_list_proxy_server.getHost());
|
||||
|
||||
|
@ -237,7 +237,6 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
|
||||
/// In case of multiple files for the same file in clickhouse (i.e. log family)
|
||||
/// file_offset_of_buffer_end will not match getImplementationBufferOffset()
|
||||
/// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()]
|
||||
chassert(file_offset_of_buffer_end >= impl->getFileOffsetOfBufferEnd());
|
||||
chassert(file_offset_of_buffer_end <= impl->getFileSize());
|
||||
|
||||
return bytes_read;
|
||||
@ -312,7 +311,8 @@ off_t AsynchronousBoundedReadBuffer::seek(off_t offset, int whence)
|
||||
|
||||
if (read_until_position && new_pos > *read_until_position)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
|
||||
if (!impl->seekIsCheap())
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
|
||||
file_offset_of_buffer_end = new_pos = *read_until_position; /// read_until_position is a non-included boundary.
|
||||
impl->seek(file_offset_of_buffer_end, SEEK_SET);
|
||||
return new_pos;
|
||||
@ -330,7 +330,8 @@ off_t AsynchronousBoundedReadBuffer::seek(off_t offset, int whence)
|
||||
}
|
||||
else
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
|
||||
if (!impl->seekIsCheap())
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
|
||||
file_offset_of_buffer_end = new_pos;
|
||||
impl->seek(file_offset_of_buffer_end, SEEK_SET);
|
||||
}
|
||||
|
@ -1205,13 +1205,6 @@ off_t CachedOnDiskReadBufferFromFile::getPosition()
|
||||
return file_offset_of_buffer_end - available();
|
||||
}
|
||||
|
||||
void CachedOnDiskReadBufferFromFile::assertCorrectness() const
|
||||
{
|
||||
if (!CachedObjectStorage::canUseReadThroughCache(settings)
|
||||
&& !settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache usage is not allowed (query_id: {})", query_id);
|
||||
}
|
||||
|
||||
String CachedOnDiskReadBufferFromFile::getInfoForLog()
|
||||
{
|
||||
String current_file_segment_info;
|
||||
|
@ -64,7 +64,6 @@ private:
|
||||
using ImplementationBufferPtr = std::shared_ptr<ReadBufferFromFileBase>;
|
||||
|
||||
void initialize(size_t offset, size_t size);
|
||||
void assertCorrectness() const;
|
||||
|
||||
/**
|
||||
* Return a list of file segments ordered in ascending order. This list represents
|
||||
|
@ -108,6 +108,10 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset
|
||||
data += size_to_write;
|
||||
}
|
||||
|
||||
size_t available_size = file_segment->range().size() - file_segment->getDownloadedSize();
|
||||
if (available_size == 0)
|
||||
completeFileSegment();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -196,15 +200,16 @@ CachedOnDiskWriteBufferFromFile::CachedOnDiskWriteBufferFromFile(
|
||||
const String & source_path_,
|
||||
const FileCache::Key & key_,
|
||||
const String & query_id_,
|
||||
const WriteSettings & settings_)
|
||||
const WriteSettings & settings_,
|
||||
std::shared_ptr<FilesystemCacheLog> cache_log_)
|
||||
: WriteBufferFromFileDecorator(std::move(impl_))
|
||||
, log(&Poco::Logger::get("CachedOnDiskWriteBufferFromFile"))
|
||||
, cache(cache_)
|
||||
, source_path(source_path_)
|
||||
, key(key_)
|
||||
, query_id(query_id_)
|
||||
, enable_cache_log(!query_id_.empty() && settings_.enable_filesystem_cache_log)
|
||||
, throw_on_error_from_cache(settings_.throw_on_error_from_cache)
|
||||
, cache_log(!query_id_.empty() && settings_.enable_filesystem_cache_log ? cache_log_ : nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
@ -241,10 +246,6 @@ void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size, bool t
|
||||
|
||||
if (!cache_writer)
|
||||
{
|
||||
std::shared_ptr<FilesystemCacheLog> cache_log;
|
||||
if (enable_cache_log)
|
||||
cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog();
|
||||
|
||||
cache_writer = std::make_unique<FileSegmentRangeWriter>(cache.get(), key, cache_log, query_id, source_path);
|
||||
}
|
||||
|
||||
|
@ -73,7 +73,8 @@ public:
|
||||
const String & source_path_,
|
||||
const FileCache::Key & key_,
|
||||
const String & query_id_,
|
||||
const WriteSettings & settings_);
|
||||
const WriteSettings & settings_,
|
||||
std::shared_ptr<FilesystemCacheLog> cache_log_);
|
||||
|
||||
void nextImpl() override;
|
||||
|
||||
@ -91,12 +92,11 @@ private:
|
||||
size_t current_download_offset = 0;
|
||||
const String query_id;
|
||||
|
||||
bool enable_cache_log;
|
||||
|
||||
bool throw_on_error_from_cache;
|
||||
bool cache_in_error_state_or_disabled = false;
|
||||
|
||||
std::unique_ptr<FileSegmentRangeWriter> cache_writer;
|
||||
std::shared_ptr<FilesystemCacheLog> cache_log;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -113,7 +113,8 @@ std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// N
|
||||
implementation_buffer->getFileName(),
|
||||
key,
|
||||
CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() ? std::string(CurrentThread::getQueryId()) : "",
|
||||
modified_write_settings);
|
||||
modified_write_settings,
|
||||
Context::getGlobalContextInstance()->getFilesystemCacheLog());
|
||||
}
|
||||
|
||||
return implementation_buffer;
|
||||
|
@ -509,13 +509,11 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
|
||||
LOG_TEST(log, "Write file: {}", path);
|
||||
|
||||
auto transaction = createObjectStorageTransaction();
|
||||
auto result = transaction->writeFile(
|
||||
return transaction->writeFile(
|
||||
path,
|
||||
buf_size,
|
||||
mode,
|
||||
object_storage->getAdjustedSettingsFromMetadataFile(settings, path));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
Strings DiskObjectStorage::getBlobPath(const String & path) const
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include <IO/S3/PocoHTTPClientFactory.h>
|
||||
#include <IO/S3/AWSLogger.h>
|
||||
#include <IO/S3/Credentials.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
#include <Common/assert_cast.h>
|
||||
|
||||
@ -866,7 +867,9 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT
|
||||
const ThrottlerPtr & put_request_throttler,
|
||||
const String & protocol)
|
||||
{
|
||||
auto proxy_configuration_resolver = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::protocolFromString(protocol));
|
||||
auto context = Context::getGlobalContextInstance();
|
||||
chassert(context);
|
||||
auto proxy_configuration_resolver = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::protocolFromString(protocol), context->getConfigRef());
|
||||
|
||||
auto per_request_configuration = [=] () { return proxy_configuration_resolver->resolve(); };
|
||||
auto error_report = [=] (const DB::ProxyConfiguration & req) { proxy_configuration_resolver->errorReport(req); };
|
||||
|
@ -55,7 +55,7 @@ namespace ErrorCodes
|
||||
|
||||
FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & settings)
|
||||
: max_file_segment_size(settings.max_file_segment_size)
|
||||
, bypass_cache_threshold(settings.enable_bypass_cache_with_threashold ? settings.bypass_cache_threashold : 0)
|
||||
, bypass_cache_threshold(settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0)
|
||||
, boundary_alignment(settings.boundary_alignment)
|
||||
, background_download_threads(settings.background_download_threads)
|
||||
, metadata_download_threads(settings.load_metadata_threads)
|
||||
|
@ -39,10 +39,10 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
|
||||
enable_filesystem_query_cache_limit = config.getUInt64(config_prefix + ".enable_filesystem_query_cache_limit", false);
|
||||
cache_hits_threshold = config.getUInt64(config_prefix + ".cache_hits_threshold", FILECACHE_DEFAULT_HITS_THRESHOLD);
|
||||
|
||||
enable_bypass_cache_with_threashold = config.getUInt64(config_prefix + ".enable_bypass_cache_with_threashold", false);
|
||||
enable_bypass_cache_with_threshold = config.getUInt64(config_prefix + ".enable_bypass_cache_with_threshold", false);
|
||||
|
||||
if (config.has(config_prefix + ".bypass_cache_threashold"))
|
||||
bypass_cache_threashold = parseWithSizeSuffix<uint64_t>(config.getString(config_prefix + ".bypass_cache_threashold"));
|
||||
if (config.has(config_prefix + ".bypass_cache_threshold"))
|
||||
bypass_cache_threshold = parseWithSizeSuffix<uint64_t>(config.getString(config_prefix + ".bypass_cache_threshold"));
|
||||
|
||||
if (config.has(config_prefix + ".boundary_alignment"))
|
||||
boundary_alignment = parseWithSizeSuffix<uint64_t>(config.getString(config_prefix + ".boundary_alignment"));
|
||||
|
@ -22,8 +22,8 @@ struct FileCacheSettings
|
||||
size_t cache_hits_threshold = FILECACHE_DEFAULT_HITS_THRESHOLD;
|
||||
bool enable_filesystem_query_cache_limit = false;
|
||||
|
||||
bool enable_bypass_cache_with_threashold = false;
|
||||
size_t bypass_cache_threashold = FILECACHE_BYPASS_THRESHOLD;
|
||||
bool enable_bypass_cache_with_threshold = false;
|
||||
size_t bypass_cache_threshold = FILECACHE_BYPASS_THRESHOLD;
|
||||
|
||||
size_t boundary_alignment = FILECACHE_DEFAULT_FILE_SEGMENT_ALIGNMENT;
|
||||
size_t background_download_threads = FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_THREADS;
|
||||
|
@ -54,7 +54,7 @@ BlockIO InterpreterDescribeCacheQuery::execute()
|
||||
res_columns[i++]->insert(cache->getFileSegmentsNum());
|
||||
res_columns[i++]->insert(cache->getBasePath());
|
||||
res_columns[i++]->insert(settings.background_download_threads);
|
||||
res_columns[i++]->insert(settings.enable_bypass_cache_with_threashold);
|
||||
res_columns[i++]->insert(settings.enable_bypass_cache_with_threshold);
|
||||
|
||||
BlockIO res;
|
||||
size_t num_rows = res_columns[0]->size();
|
||||
|
@ -180,7 +180,7 @@ namespace
|
||||
{
|
||||
auto protocol = protocol_string == "https" ? ProxyConfigurationResolver::Protocol::HTTPS
|
||||
: ProxyConfigurationResolver::Protocol::HTTP;
|
||||
auto proxy_config = ProxyConfigurationResolverProvider::get(protocol)->resolve();
|
||||
auto proxy_config = ProxyConfigurationResolverProvider::get(protocol, Context::getGlobalContextInstance()->getConfigRef())->resolve();
|
||||
|
||||
return proxyConfigurationToPocoProxyConfiguration(proxy_config);
|
||||
}
|
||||
|
@ -13,8 +13,8 @@ SETTINGS min_bytes_for_wide_part = 10485760,
|
||||
type = cache,
|
||||
max_size = '128Mi',
|
||||
path = '${CLICKHOUSE_TEST_UNIQUE_NAME}_cache',
|
||||
enable_bypass_cache_with_threashold = 1,
|
||||
bypass_cache_threashold = 100,
|
||||
enable_bypass_cache_with_threshold = 1,
|
||||
bypass_cache_threshold = 100,
|
||||
delayed_cleanup_interval_ms = 100,
|
||||
disk = 's3_disk');
|
||||
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
|
||||
|
@ -16,8 +16,8 @@ SETTINGS min_bytes_for_wide_part = 10485760,
|
||||
type = cache,
|
||||
max_size = '128Mi',
|
||||
path = '${CLICKHOUSE_TEST_UNIQUE_NAME}_cache',
|
||||
enable_bypass_cache_with_threashold = 1,
|
||||
bypass_cache_threashold = 100,
|
||||
enable_bypass_cache_with_threshold = 1,
|
||||
bypass_cache_threshold = 100,
|
||||
delayed_cleanup_interval_ms = 100,
|
||||
disk = 's3_disk');
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user